Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dictutils: add filter_dict_keys #321

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions invenio_records/dictutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

"""Dictionary utilities."""

from copy import deepcopy


def clear_none(d):
"""Clear None values and empty dicts from a dict."""
Expand Down Expand Up @@ -147,3 +145,37 @@ def dict_merge(dest, source):
dict_merge(dest[key], source[key])
else:
dest[key] = source[key]


def filter_dict_keys(src, keys):
"""Filter a dictionary based on a list of key paths."""
# Split the keys into top-level and nested keys
top_level_keys = [key for key in keys if "." not in key]
nested_keys = [key for key in keys if "." in key]

# Filter the top-level keys
result = {key: src[key] for key in top_level_keys if key in src}

# Handle nested keys
for key in nested_keys:
parts = key.split(".")
current_dict = src
for part in parts[:-1]:
if part in current_dict:
current_dict = current_dict[part]
else:
break # Skip this key if the path does not exist
# Update the filtered dictionary with the nested key if it exists
if parts[-2] in result and parts[-1] in current_dict:
if parts[-2] not in result:
result[parts[-2]] = {}
result[parts[-2]][parts[-1]] = current_dict[parts[-1]]

# Handle specific case for top-level keys that are dictionaries but not explicitly mentioned
for key in src:
if key not in result and isinstance(src[key], dict):
subkeys = [k.split(".", 1)[1] for k in keys if k.startswith(f"{key}.")]
if subkeys:
result[key] = filter_dict_keys(src[key], subkeys)

return result
21 changes: 20 additions & 1 deletion tests/test_dictutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@

import pytest

from invenio_records.dictutils import clear_none, dict_lookup, dict_merge
from invenio_records.dictutils import (
clear_none,
dict_lookup,
dict_merge,
filter_dict_keys,
)


def test_clear_none():
Expand Down Expand Up @@ -78,3 +83,17 @@ def test_dict_merge():
"foo2": "bar2",
"metadata": {"field1": 3, "field2": "test"},
}


def test_filter_dict_keys():
"""Test filter dict keys."""
source = {
"foo1": {"bar1": 1, "bar2": 2},
"foo2": 1,
"foo3": {"bar1": {"foo4": 0}, "bar2": 1, "bar3": 2},
"foo4": {},
}

assert filter_dict_keys(
source, ["foo1.bar1", "foo2", "foo3.bar1.foo4", "foo3.bar3"]
) == {"foo1": {"bar1": 1}, "foo2": 1, "foo3": {"bar1": {"foo4": 0}, "bar3": 2}}
6 changes: 3 additions & 3 deletions tests/test_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def local_ref_resolver_store_factory():

@pytest.fixture(scope="module")
def app_config(app_config):
app_config[
"RECORDS_REFRESOLVER_CLS"
] = "invenio_records.resolver.InvenioRefResolver"
app_config["RECORDS_REFRESOLVER_CLS"] = (
"invenio_records.resolver.InvenioRefResolver"
)
app_config["RECORDS_REFRESOLVER_STORE"] = local_ref_resolver_store_factory()
return app_config

Expand Down
Loading