Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalog to config #4323

Merged
merged 88 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
ae5384a
Captured init arguments
ElenaKhaustova Nov 7, 2024
fcdf357
Implemented unresoloving credentials
ElenaKhaustova Nov 7, 2024
b147374
Added some comments
ElenaKhaustova Nov 7, 2024
0ed0c1e
Put type in first place for dataset config
ElenaKhaustova Nov 7, 2024
3c839a9
Handled version key
ElenaKhaustova Nov 7, 2024
29bb714
Added lazy dataset to_config
ElenaKhaustova Nov 8, 2024
49858b6
Removed data key from MemoryDataset
ElenaKhaustova Nov 8, 2024
0d0ba91
Added TODOs
ElenaKhaustova Nov 8, 2024
8413f58
Saved call args
ElenaKhaustova Nov 11, 2024
a89db7e
Saved only set credentials
ElenaKhaustova Nov 11, 2024
b081c65
Processed CachedDataset case
ElenaKhaustova Nov 11, 2024
18c0ad6
Updated TODOs
ElenaKhaustova Nov 12, 2024
2751ea8
Tested with PartitionedDataset
ElenaKhaustova Nov 12, 2024
fc576ff
Popped metadata
ElenaKhaustova Nov 12, 2024
1d6454c
Fixed versioning when load
ElenaKhaustova Nov 12, 2024
8c31237
Fixed linter
ElenaKhaustova Nov 12, 2024
e035881
Tested datasets factories
ElenaKhaustova Nov 13, 2024
edcdc38
Tested transcoding
ElenaKhaustova Nov 13, 2024
15a1e72
Removed TODOs
ElenaKhaustova Nov 13, 2024
d4e4534
Removed debug output
ElenaKhaustova Nov 13, 2024
e7e8af5
Removed debug output
ElenaKhaustova Nov 13, 2024
1b6be8e
Added logic to set VERSIONED_FLAG_KEY
ElenaKhaustova Nov 14, 2024
db77436
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
c6dc380
Updated version set up
ElenaKhaustova Nov 14, 2024
54b0793
Added TODO for versioning
ElenaKhaustova Nov 14, 2024
f183e60
Added tests for unresolve_config_credentials
ElenaKhaustova Nov 14, 2024
0d9d241
Implemented test_to_config
ElenaKhaustova Nov 14, 2024
763e635
Added test with MemoryDataset
ElenaKhaustova Nov 14, 2024
8795dd6
Extended test examples
ElenaKhaustova Nov 14, 2024
e3289b4
Materialized cached_ds
ElenaKhaustova Nov 14, 2024
5d93a41
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
59b603e
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 15, 2024
ae62886
Exclude parameters
ElenaKhaustova Nov 18, 2024
b2ebfe2
Fixed import
ElenaKhaustova Nov 18, 2024
a07107a
Added test with parameters
ElenaKhaustova Nov 18, 2024
5dc4abf
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 19, 2024
e5adb5d
Moved tests for CatalogConfigResolver to a separate file
ElenaKhaustova Nov 19, 2024
bdf45a3
Made unresolve_config_credentials staticmethod
ElenaKhaustova Nov 19, 2024
33d6791
Updated comment to clarify meaning
ElenaKhaustova Nov 19, 2024
33ff979
Moved to_config anfter from_config
ElenaKhaustova Nov 19, 2024
7546540
Returned is_parameter for catalog and added TODOs
ElenaKhaustova Nov 19, 2024
c37c04d
Renamed catalog config resolver methods
ElenaKhaustova Nov 20, 2024
591f4a0
Implemented _validate_versions method
ElenaKhaustova Nov 21, 2024
5aaebe6
Added _validate_versions calls
ElenaKhaustova Nov 21, 2024
bdb7cf6
Updated error descriptions
ElenaKhaustova Nov 21, 2024
e2ffeaa
Added validation to the old catalog
ElenaKhaustova Nov 22, 2024
6b1e802
Fixed linter
ElenaKhaustova Nov 22, 2024
06e343b
Implemented unit tests for KedroDataCatalog
ElenaKhaustova Nov 22, 2024
5492b9f
Removed odd comments
ElenaKhaustova Nov 22, 2024
c96546c
Implemented tests for DataCatalog
ElenaKhaustova Nov 22, 2024
46f2df6
Added docstrings
ElenaKhaustova Nov 22, 2024
56a067c
Added release notes
ElenaKhaustova Nov 22, 2024
cbd1d4a
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 22, 2024
e9027b9
Updated version logic
ElenaKhaustova Nov 22, 2024
11b148b
Added CachedDataset case
ElenaKhaustova Nov 22, 2024
163ca17
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
ca2ac6c
Updated release notes
ElenaKhaustova Nov 27, 2024
615d135
Added tests for CachedDataset use case
ElenaKhaustova Nov 27, 2024
8dd1084
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
8a01881
Updated unit test after version validation is applied
ElenaKhaustova Nov 27, 2024
eb44a30
Removed MemoryDatasets
ElenaKhaustova Nov 27, 2024
ba3d04e
Removed _is_parameter
ElenaKhaustova Nov 27, 2024
35953a9
Pop metadata from cached dataset configuration
ElenaKhaustova Nov 27, 2024
d56793b
Fixed lint
ElenaKhaustova Nov 27, 2024
ebf1483
Fixed unit test
ElenaKhaustova Nov 27, 2024
f5468c9
Added docstrings for AbstractDataset.to_config()
ElenaKhaustova Nov 27, 2024
edee597
Updated docstrings
ElenaKhaustova Nov 27, 2024
4454970
Fixed typos
ElenaKhaustova Nov 27, 2024
5d6bd3c
Updated TODOs
ElenaKhaustova Nov 27, 2024
f1ace7c
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
95fc260
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
86e25e9
Added docstring for KedroDataCatalog.to_config
ElenaKhaustova Nov 27, 2024
c8fd99e
Added docstrinbgs for unresolve_credentials
ElenaKhaustova Nov 27, 2024
5b2f21f
Updated release notes
ElenaKhaustova Nov 27, 2024
35dc102
Fixed indentation
ElenaKhaustova Nov 27, 2024
2853fda
Fixed to_config() example
ElenaKhaustova Nov 27, 2024
8f0fe4f
Fixed indentation
ElenaKhaustova Nov 27, 2024
0db9b46
Fixed indentation
ElenaKhaustova Nov 27, 2024
2f72e23
Added a note about to_config() constraints
ElenaKhaustova Nov 27, 2024
33f29fd
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 28, 2024
a7689b9
Fixed typo
ElenaKhaustova Nov 28, 2024
3c3664e
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
b7183ab
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
171e80f
Moved _is_memory_dataset
ElenaKhaustova Nov 28, 2024
b789018
Simplified nested decorator
ElenaKhaustova Nov 28, 2024
6ba6ee4
Fixed lint
ElenaKhaustova Nov 28, 2024
7008346
Removed _init_args class attribute
ElenaKhaustova Nov 29, 2024
7af15a5
Returned @wraps
ElenaKhaustova Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def _create_session(package_name: str, **kwargs: Any) -> KedroSession:


def is_parameter(dataset_name: str) -> bool:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
# TODO: when breaking change replace with is_parameter from kedro/io/core.py
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"

Expand Down
30 changes: 27 additions & 3 deletions kedro/io/catalog_config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ def _extract_patterns(

return sorted_patterns, user_default

@classmethod
def _resolve_config_credentials(
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
self,
cls,
config: dict[str, dict[str, Any]] | None,
credentials: dict[str, dict[str, Any]] | None,
) -> dict[str, dict[str, Any]]:
Expand All @@ -254,13 +255,36 @@ def _resolve_config_credentials(
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the key is preceded by an underscore."
)
if not self.is_pattern(ds_name):
resolved_configs[ds_name] = self._resolve_credentials(
if not cls.is_pattern(ds_name):
resolved_configs[ds_name] = cls._resolve_credentials(
ds_config, credentials
)

return resolved_configs

@staticmethod
def unresolve_config_credentials(
merelcht marked this conversation as resolved.
Show resolved Hide resolved
cred_name: str, ds_config: dict[str, dict[str, Any]] | None
) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
ds_config_copy = copy.deepcopy(ds_config) or {}
credentials: dict[str, Any] = {}
credentials_ref = f"{cred_name}_{CREDENTIALS_KEY}"

def unresolve(config: Any) -> None:
# We don't expect credentials key appears more than once within the same dataset config,
# So once we found the key first time we unresolve it and stop iterating after
for key, val in config.items():
if key == CREDENTIALS_KEY and config[key]:
credentials[credentials_ref] = config[key]
config[key] = credentials_ref
return
if isinstance(val, dict):
unresolve(val)

unresolve(ds_config_copy)

return ds_config_copy, credentials

def resolve_pattern(self, ds_name: str) -> dict[str, Any]:
"""Resolve dataset patterns and return resolved configurations based on the existing patterns."""
matched_pattern = self.match_pattern(ds_name)
Expand Down
74 changes: 72 additions & 2 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime, timezone
from functools import partial, wraps
from glob import iglob
from inspect import getcallargs
from operator import attrgetter
from pathlib import Path, PurePath, PurePosixPath
from typing import (
Expand Down Expand Up @@ -57,6 +58,7 @@
"s3a",
"s3n",
)
TYPE_KEY = "type"


class DatasetError(Exception):
Expand Down Expand Up @@ -148,6 +150,11 @@ class AbstractDataset(abc.ABC, Generic[_DI, _DO]):
need to change the `_EPHEMERAL` attribute to 'True'.
"""
_EPHEMERAL = False
_init_args: dict[str, Any] | None = None
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, call_args: dict[str, Any]) -> None:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
self._init_args = call_args
self._init_args.pop("self", None)

@classmethod
def from_config(
Expand Down Expand Up @@ -201,6 +208,49 @@ def from_config(
) from err
return dataset

def to_config(self) -> tuple[dict[str, Any], dict[str, str] | None, str | None]:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
return_config: dict[str, Any] = {
f"{TYPE_KEY}": f"{type(self).__module__}.{type(self).__name__}"
}
load_versions: dict[str, str] | None = None
save_version: str | None = None

if self._init_args:
return_config.update(self._init_args)

if type(self).__name__ == "CachedDataset":
cached_ds = return_config.pop("dataset")
cached_ds_return_config: dict[str, Any] = {}
if isinstance(cached_ds, dict):
cached_ds_return_config = cached_ds
elif isinstance(cached_ds, AbstractDataset):
cached_ds_return_config, load_versions, save_version = (
cached_ds.to_config()
)
if VERSIONED_FLAG_KEY in cached_ds_return_config:
return_config[VERSIONED_FLAG_KEY] = cached_ds_return_config.pop(
VERSIONED_FLAG_KEY
)
return_config["dataset"] = cached_ds_return_config

version = return_config.pop(VERSION_KEY, None)

if version:
return_config[VERSIONED_FLAG_KEY] = True
load_versions, save_version = (
load_versions or version.load,
save_version or version.save,
)

# Pop data from configuration
if type(self).__name__ == "MemoryDataset":
return_config.pop("data", None)

# Pop metadata from configuration
return_config.pop("metadata", None)

return return_config, load_versions, save_version

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)
Expand Down Expand Up @@ -286,6 +336,20 @@ def __init_subclass__(cls, **kwargs: Any) -> None:
If `_load` or `_save` are defined, alias them as a prerequisite.

"""

init_func: Callable = cls.__init__

def init_decorator(previous_init: Callable) -> Callable:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
def new_init(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
previous_init(self, *args, **kwargs)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
if type(self) is cls:
call_args = getcallargs(init_func, self, *args, **kwargs)
self.__post_init__(call_args)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

return new_init

cls.__init__ = init_decorator(cls.__init__) # type: ignore[method-assign]
astrojuanlu marked this conversation as resolved.
Show resolved Hide resolved

super().__init_subclass__(**kwargs)

if hasattr(cls, "_load") and not cls._load.__qualname__.startswith("Abstract"):
Expand Down Expand Up @@ -484,14 +548,14 @@ def parse_dataset_definition(
config = copy.deepcopy(config)

# TODO: remove when removing old catalog as moved to KedroDataCatalog
if "type" not in config:
if TYPE_KEY not in config:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
raise DatasetError(
"'type' is missing from dataset catalog configuration."
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the top level key is preceded by an underscore."
)

dataset_type = config.pop("type")
dataset_type = config.pop(TYPE_KEY)
class_obj = None
if isinstance(dataset_type, str):
if len(dataset_type.strip(".")) != len(dataset_type):
Expand Down Expand Up @@ -955,3 +1019,9 @@ def confirm(self, name: str) -> None:
def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C:
"""Returns a shallow copy of the current object."""
...


def _is_parameter(dataset_name: str) -> bool:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
# TODO: when breaking change replace with is_parameter and remove is_parameter from kedro/framework/cli/catalog.py
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"
49 changes: 48 additions & 1 deletion kedro/io/kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

from kedro.io.catalog_config_resolver import CatalogConfigResolver, Patterns
from kedro.io.core import (
VERSIONED_FLAG_KEY,
AbstractDataset,
AbstractVersionedDataset,
CatalogProtocol,
DatasetAlreadyExistsError,
DatasetError,
DatasetNotFoundError,
Version,
_is_parameter,
generate_timestamp,
)
from kedro.io.memory_dataset import MemoryDataset
Expand Down Expand Up @@ -96,7 +98,7 @@ def __init__(
>>> catalog = KedroDataCatalog(datasets={"cars": cars})
"""
self._config_resolver = config_resolver or CatalogConfigResolver()
self._datasets = datasets or {}
self._datasets: dict[str, AbstractDataset] = datasets or {}
self._lazy_datasets: dict[str, _LazyDataset] = {}
self._load_versions = load_versions or {}
self._save_version = save_version
Expand Down Expand Up @@ -364,6 +366,51 @@ class to be loaded is specified with the key ``type`` and their
config_resolver=config_resolver,
)

def to_config(
self,
) -> tuple[
dict[str, dict[str, Any]],
dict[str, dict[str, Any]],
dict[str, str | None],
dict[str, str | None],
]:
catalog: dict[str, dict[str, Any]] = {}
credentials: dict[str, dict[str, Any]] = {}
load_version: dict[str, str | None] = {}
save_version: dict[str, str | None] = {}

for ds_name, ds in self._lazy_datasets.items():
if _is_parameter(ds_name):
continue
unresolved_config, unresolved_credentials = (
self._config_resolver.unresolve_config_credentials(ds_name, ds.config)
)
catalog[ds_name] = unresolved_config
credentials.update(unresolved_credentials)
# TODO: Update when #4327 resolved
if catalog[ds_name].get(VERSIONED_FLAG_KEY, None):
load_version[ds_name] = ds.load_version
save_version[ds_name] = ds.save_version
else:
load_version[ds_name] = None
save_version[ds_name] = None

for ds_name, ds in self._datasets.items(): # type: ignore[assignment]
if _is_parameter(ds_name):
continue
resolved_config, cur_load_versions, cur_save_version = ds.to_config() # type: ignore[attr-defined]
unresolved_config, unresolved_credentials = (
self._config_resolver.unresolve_config_credentials(
ds_name, resolved_config
)
)
catalog[ds_name] = unresolved_config
credentials.update(unresolved_credentials)
load_version[ds_name] = cur_load_versions
save_version[ds_name] = cur_save_version

return catalog, credentials, load_version, save_version

@staticmethod
def _validate_dataset_config(ds_name: str, ds_config: Any) -> None:
if not isinstance(ds_config, dict):
Expand Down
46 changes: 46 additions & 0 deletions tests/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,52 @@ def correct_config(filepath):
}


@pytest.fixture
def correct_config_versioned(filepath):
return {
"catalog": {
"boats": {
"type": "pandas.CSVDataset",
"filepath": filepath,
"versioned": True,
},
"cars": {
"type": "pandas.CSVDataset",
"filepath": "s3://test_bucket/test_file.csv",
"credentials": "cars_credentials",
},
"cars_ibis": {
"type": "ibis.FileDataset",
"filepath": "cars_ibis.csv",
"file_format": "csv",
"table_name": "cars",
"connection": {"backend": "duckdb", "database": "company.db"},
"load_args": {"sep": ",", "nullstr": "#NA"},
"save_args": {"sep": ",", "nullstr": "#NA"},
},
"cached_ds": {
"type": "kedro.io.cached_dataset.CachedDataset",
"versioned": True,
"dataset": {
"type": "pandas.CSVDataset",
"filepath": "cached_ds.csv",
"credentials": "cached_ds_credentials",
},
"copy_mode": None,
},
"parameters": {
"type": "kedro.io.memory_dataset.MemoryDataset",
"data": [4, 5, 6],
"copy_mode": None,
},
},
"credentials": {
"cars_credentials": {"key": "FAKE_ACCESS_KEY", "secret": "FAKE_SECRET_KEY"},
"cached_ds_credentials": {"key": "KEY", "secret": "SECRET"},
},
}


@pytest.fixture
def correct_config_with_nested_creds(correct_config):
correct_config["catalog"]["cars"]["credentials"] = {
Expand Down
38 changes: 38 additions & 0 deletions tests/io/test_catalog_config_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from kedro.io import CatalogConfigResolver


class TestCatalogConfigResolver:
def test_unresolve_config_credentials(self, correct_config):
"""Test unresolve dataset credentials to original format."""
config = correct_config["catalog"]
credentials = correct_config["credentials"]
resolved_configs = CatalogConfigResolver._resolve_config_credentials(
config, credentials
)

unresolved_config, unresolved_credentials = (
CatalogConfigResolver.unresolve_config_credentials(
cred_name="s3", ds_config=resolved_configs
)
)
assert config == unresolved_config
assert credentials == unresolved_credentials

def test_unresolve_config_credentials_two_keys(self, correct_config):
"""Test unresolve dataset credentials to original format when two credentials keys provided."""
config = correct_config["catalog"]
credentials = correct_config["credentials"]

resolved_configs = CatalogConfigResolver._resolve_config_credentials(
config, credentials
)
resolved_configs["cars"]["metadata"] = {"credentials": {}}

unresolved_config, unresolved_credentials = (
CatalogConfigResolver.unresolve_config_credentials(
cred_name="s3", ds_config=resolved_configs
)
)
unresolved_config["cars"].pop("metadata")
assert config == unresolved_config
assert credentials == unresolved_credentials
Loading