From d0a83b29d466514ee147df2d4e0435e311457e3e Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 12:01:05 -0500 Subject: [PATCH] Nested hints tests, handle table name overrides --- dlt/extract/extractors.py | 11 ++-- dlt/extract/hints.py | 24 +++++--- tests/pipeline/test_table_hints.py | 98 ++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 tests/pipeline/test_table_hints.py diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index f42067b4c4..b10d740dbb 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -421,7 +421,7 @@ def _compute_table( for item in reversed(items): computed_tables = super()._compute_table(resource, item, Any) for computed_table in computed_tables: - arrow_table = arrow_tables.get(computed_table['name']) + arrow_table = arrow_tables.get(computed_table["name"]) # Merge the columns to include primary_key and other hints that may be set on the resource if arrow_table: utils.merge_table(self.schema.name, computed_table, arrow_table) @@ -431,7 +431,10 @@ def _compute_table( # Add load_id column if needed dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID) - if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]: + if ( + self._normalize_config.add_dlt_load_id + and dlt_load_id not in arrow_table["columns"] + ): # will be normalized line below arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column() @@ -446,8 +449,8 @@ def _compute_table( if src_hint != hint: override_warn = True logger.info( - f"In resource: {resource.name}, when merging arrow schema on" - f" column {col_name}. The hint {hint_name} value" + f"In resource: {resource.name}, when merging arrow schema" + f" on column {col_name}. The hint {hint_name} value" f" {src_hint} defined in resource will overwrite arrow hint" f" with value {hint}." ) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index b37bca3173..ea78d4688a 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -259,15 +259,15 @@ def parent_table_name(self) -> TTableHintTemplate[str]: return None if self._hints is None else self._hints.get("parent") def _walk_nested_hints( - self, path: List[str] = None - ) -> Iterator[Tuple[List[str], "DltResourceHints"]]: + self, path: Tuple[str] = None + ) -> Iterator[Tuple[Tuple[str, ...], "DltResourceHints"]]: """Walk nested hints recursively to generate a flat iterator of path and `DltResourceHints` instance pairs""" if path is None: - path = [] + path = tuple() if path: yield path, self for key, nested_instance in self._nested_hints.items(): - yield from nested_instance._walk_nested_hints(path + [key]) + yield from nested_instance._walk_nested_hints(path + (key,)) def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation. @@ -320,12 +320,16 @@ def compute_table_chain(self, item: TDataItem = None, meta: Any = None) -> List[ else: root_table_name = root_table["name"] result = [root_table] + path_to_name: Dict[Tuple[str, ...], str] = {(root_table_name,): root_table_name} for path, instance in self._walk_nested_hints(): - full_path = [root_table_name] + path - table_name = "__".join(full_path) # TODO: naming convention + full_path = (root_table_name,) + path table = instance.compute_table_schema(item, meta) - table["name"] = table_name - parent_name = "__".join(full_path[:-1]) + if not table.get("name"): + table["name"] = "__".join(full_path) # TODO: naming convention + path_to_name[full_path] = table["name"] + parent_name = path_to_name[full_path[:-1]] + + # parent_name = "__".join(full_path[:-1]) table["parent"] = parent_name result.append(table) @@ -369,7 +373,7 @@ def apply_hints( hints_instance: DltResourceHints if path: path = tuple(path) - hints_instance = self._nested_hints.get(path) + hints_instance = self._nested_hints.get(path) # type: ignore[call-overload] if not hints_instance: hints_instance = DltResourceHints() else: @@ -666,7 +670,7 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> """Creates table schema from resource hints and resource name. Resource hints are resolved (do not contain callables) and will be modified in place """ - resource_hints["name"] = resource_hints.pop("table_name") + resource_hints["name"] = resource_hints.pop("table_name") # type: ignore[typeddict-unknown-key] DltResourceHints._merge_keys(resource_hints) if "write_disposition" in resource_hints: if isinstance(resource_hints["write_disposition"], str): diff --git a/tests/pipeline/test_table_hints.py b/tests/pipeline/test_table_hints.py new file mode 100644 index 0000000000..ce0cda5d28 --- /dev/null +++ b/tests/pipeline/test_table_hints.py @@ -0,0 +1,98 @@ +import dlt + + +@dlt.resource +def nested_data(): + yield [ + { + "id": 1, + "a": [ + { + "a_id": "2", + "b": [{"b_id": "3"}], + } + ], + "c": [ + { + "c_id": "4", + "d": [{"d_id": "5"}], + } + ], + } + ] + + +def test_apply_hints_nested_hints_column_types() -> None: + nested_data_rs = nested_data() + + nested_data_rs.apply_hints( + path=["a", "b"], + columns=[ + { + "name": "b_id", + "data_type": "bigint", + }, + ], + ) + nested_data_rs.apply_hints( + path=["c"], + columns=[ + { + "name": "c_id", + "data_type": "double", + }, + ], + ) + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints", dev_mode=True, destination="duckdb" + ) + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["nested_data__a__b"]["columns"]["b_id"]["data_type"] == "bigint" + assert schema_tables["nested_data__c"]["columns"]["c_id"]["data_type"] == "double" + + assert schema_tables["nested_data__a__b"]["parent"] == "nested_data__a" + assert schema_tables["nested_data__c"]["parent"] == "nested_data" + + # Try changing the parent name + nested_data_rs.apply_hints(table_name="override_parent") + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints_2", dev_mode=True, destination="duckdb" + ) + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["override_parent__a__b"]["parent"] == "override_parent__a" + assert schema_tables["override_parent__c"]["parent"] == "override_parent" + assert schema_tables["override_parent__a__b"]["name"] == "override_parent__a__b" + assert schema_tables["override_parent__a__b"]["columns"]["b_id"]["data_type"] == "bigint" + + for key in schema_tables.keys(): + assert not key.startswith("nested_data") + + +def test_apply_hints_nested_hints_override_child_name(): + nested_data_rs = nested_data() + + # Override both levels of child tables + nested_data_rs.apply_hints(path=["a"], table_name="override_child_a") + nested_data_rs.apply_hints(path=["a", "b"], table_name="override_child_a_b") + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints_3", dev_mode=True, destination="duckdb" + ) + + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["override_child_a_b"]["name"] == "override_child_a_b" + # Parent should match the overrid parent name + assert schema_tables["override_child_a_b"]["parent"] == "override_child_a" + + assert schema_tables["override_child_a"]["name"] == "override_child_a"