From 1ea2af25890d454ebcca67d2c1842064d2aaa991 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Wed, 11 Dec 2024 08:05:27 -0500 Subject: [PATCH 1/8] unifies ResourceHints typed dict --- dlt/extract/extractors.py | 2 +- dlt/extract/hints.py | 70 ++++++++++++++++++++++++---------- dlt/sources/rest_api/typing.py | 2 - 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 03f8a31462..9f911f1d75 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -132,7 +132,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No # convert to table meta if created table variant so item is assigned to this table if meta.create_table_variant: # name in hints meta must be a string, otherwise merge_hints would fail - meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type] + meta = TableNameMeta(meta.hints["table_name"]) # type: ignore[arg-type] self._reset_contracts_cache() if table_name := self._get_static_table_name(resource, meta): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 22a0062acf..358a3110d0 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -44,24 +44,24 @@ class TResourceHintsBase(TypedDict, total=False): + table_name: Optional[TTableHintTemplate[str]] write_disposition: Optional[TTableHintTemplate[TWriteDispositionConfig]] parent: Optional[TTableHintTemplate[str]] primary_key: Optional[TTableHintTemplate[TColumnNames]] + columns: Optional[TTableHintTemplate[TAnySchemaColumns]] schema_contract: Optional[TTableHintTemplate[TSchemaContract]] table_format: Optional[TTableHintTemplate[TTableFormat]] + file_format: TTableHintTemplate[TFileFormat] merge_key: Optional[TTableHintTemplate[TColumnNames]] references: Optional[TTableHintTemplate[TTableReferenceParam]] + nested_hints: Optional[Dict[str, "TResourceHintsBase"]] class TResourceHints(TResourceHintsBase, total=False): - name: TTableHintTemplate[str] # description: TTableHintTemplate[str] - # table_sealed: Optional[bool] - columns: TTableHintTemplate[TTableSchemaColumns] - incremental: Incremental[Any] - file_format: TTableHintTemplate[TFileFormat] + incremental: Optional[Incremental[Any]] validator: ValidateItem - original_columns: TTableHintTemplate[TAnySchemaColumns] + original_columns: Optional[TTableHintTemplate[TAnySchemaColumns]] class HintsMeta: @@ -94,6 +94,7 @@ def make_hints( """ validator, schema_contract = create_item_validator(columns, schema_contract) # create a table schema template where hints can be functions taking TDataItem + # TODO: do not use new_table here and get rid if typing ignores new_template: TResourceHints = new_table( table_name, # type: ignore parent_table_name, # type: ignore @@ -103,8 +104,9 @@ def make_hints( file_format=file_format, # type: ignore references=references, # type: ignore ) + new_template["table_name"] = new_template.pop("name") # type: ignore if not table_name: - new_template.pop("name") + del new_template["table_name"] if not write_disposition and "write_disposition" in new_template: new_template.pop("write_disposition") # remember original columns and set template columns @@ -125,12 +127,34 @@ def make_hints( return new_template +class DltResourceHintsDict(Dict[str, "DltResourceHints"]): + # def __init__(self, initial_value: TResourceHintsBase) + + def __getitem__(self, key: Union[str, Sequence[str]]) -> "DltResourceHints": + """Get item at `key` is string or recursively if sequence""" + if isinstance(key, str): + return super().__getitem__(key) + else: + item = super().__getitem__(key[0]) + for k_ in key[1:]: + item = item.nested_hints[k_] + return item + + def __setitem__(self, key: str, value: Union["DltResourceHints", TResourceHintsBase]) -> None: + """Sets resource hints at given `key` or create new instance from table template""" + if isinstance(value, DltResourceHints): + return super().__setitem__(key, value) + else: + return super().__setitem__(key, DltResourceHints(value)) # type: ignore + + class DltResourceHints: def __init__(self, table_schema_template: TResourceHints = None): self.__qualname__ = self.__name__ = self.name self._table_name_hint_fun: TFunHintTemplate[str] = None self._table_has_other_dynamic_hints: bool = False self._hints: TResourceHints = None + self._nested_hints: DltResourceHintsDict = None """Hints for the resource""" self._hints_variants: Dict[str, TResourceHints] = {} """Hints for tables emitted from resources""" @@ -147,7 +171,7 @@ def table_name(self) -> TTableHintTemplate[str]: if self._table_name_hint_fun: return self._table_name_hint_fun # get table name or default name - return self._hints.get("name") or self.name if self._hints else self.name + return self._hints.get("table_name") or self.name if self._hints else self.name @table_name.setter def table_name(self, value: TTableHintTemplate[str]) -> None: @@ -166,7 +190,11 @@ def write_disposition(self, value: TTableHintTemplate[TWriteDispositionConfig]) @property def columns(self) -> TTableHintTemplate[TTableSchemaColumns]: """Gets columns' schema that can be modified in place""" - return None if self._hints is None else self._hints.get("columns") + return None if self._hints is None else self._hints.get("columns") # type: ignore[return-value] + + @property + def nested_hints(self) -> DltResourceHintsDict: + pass @property def schema_contract(self) -> TTableHintTemplate[TSchemaContract]: @@ -187,16 +215,16 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab """ if isinstance(meta, TableNameMeta): # look for variant - table_template = self._hints_variants.get(meta.table_name, self._hints) + root_table_template = self._hints_variants.get(meta.table_name, self._hints) else: - table_template = self._hints - if not table_template: + root_table_template = self._hints + if not root_table_template: return new_table(self.name, resource=self.name) # resolve a copy of a held template - table_template = self._clone_hints(table_template) - if "name" not in table_template: - table_template["name"] = self.name + root_table_template = self._clone_hints(root_table_template) + if "table_name" not in root_table_template: + root_table_template["table_name"] = self.name # if table template present and has dynamic hints, the data item must be provided. if self._table_name_hint_fun and item is None: @@ -204,7 +232,7 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab # resolve resolved_template: TResourceHints = { k: self._resolve_hint(item, v) - for k, v in table_template.items() + for k, v in root_table_template.items() if k not in NATURAL_CALLABLES } # type: ignore if "incremental" in table_template: @@ -290,9 +318,9 @@ def apply_hints( t = self._clone_hints(t) if table_name is not None: if table_name: - t["name"] = table_name + t["table_name"] = table_name else: - t.pop("name", None) + t.pop("table_name", None) if parent_table_name is not None: if parent_table_name: t["parent"] = parent_table_name @@ -310,6 +338,7 @@ def apply_hints( # normalize columns columns = ensure_table_schema_columns(columns) # this updates all columns with defaults + assert isinstance(t["columns"], dict) t["columns"] = merge_columns(t["columns"], columns, merge_columns=True) else: # set to empty columns @@ -379,7 +408,8 @@ def _set_hints( DltResourceHints.validate_write_disposition_hint(hints_template) DltResourceHints.validate_reference_hint(hints_template) if create_table_variant: - table_name: str = hints_template["name"] # type: ignore[assignment] + # for table variants, table name must be a str + table_name: str = hints_template["table_name"] # type: ignore[assignment] # incremental cannot be specified in variant if hints_template.get("incremental"): raise InconsistentTableTemplate( @@ -413,7 +443,7 @@ def merge_hints( self, hints_template: TResourceHints, create_table_variant: bool = False ) -> None: self.apply_hints( - table_name=hints_template.get("name"), + table_name=hints_template.get("table_name"), parent_table_name=hints_template.get("parent"), write_disposition=hints_template.get("write_disposition"), columns=hints_template.get("original_columns"), diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index c48e54de4a..1d53ffa666 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -273,9 +273,7 @@ class ProcessingSteps(TypedDict): class ResourceBase(TResourceHintsBase, total=False): """Defines hints that may be passed to `dlt.resource` decorator""" - table_name: Optional[TTableHintTemplate[str]] max_table_nesting: Optional[int] - columns: Optional[TTableHintTemplate[TAnySchemaColumns]] selected: Optional[bool] parallelized: Optional[bool] processing_steps: Optional[List[ProcessingSteps]] From 66a54afd0104f7b57341332415b710e52d8b8339 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Wed, 11 Dec 2024 08:28:01 -0500 Subject: [PATCH 2/8] Apply nested hints and compute table chain from nested hints --- dlt/common/schema/typing.py | 2 + dlt/extract/extractors.py | 85 ++++++++++++----------- dlt/extract/hints.py | 130 +++++++++++++++++++++++++++++++----- dlt/extract/resource.py | 7 +- 4 files changed, 168 insertions(+), 56 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 6f5d6213c9..d27dabb965 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -262,6 +262,8 @@ class TTableReference(TypedDict): class _TTableSchemaBase(TTableProcessingHints, total=False): name: Optional[str] + path: Optional[Tuple[str, ...]] + """Path of nested table""" description: Optional[str] schema_contract: Optional[TSchemaContract] table_sealed: Optional[bool] diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 9f911f1d75..bc5c0422ff 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -227,56 +227,63 @@ def _write_to_static_table( else: self._write_item(table_name, resource.name, items) - def _compute_table(self, resource: DltResource, items: TDataItems, meta: Any) -> TTableSchema: + def _compute_table( + self, resource: DltResource, items: TDataItems, meta: Any + ) -> List[TTableSchema]: """Computes a schema for a new or dynamic table and normalizes identifiers""" - return utils.normalize_table_identifiers( - resource.compute_table_schema(items, meta), self.schema.naming - ) + return [ + utils.normalize_table_identifiers(tbl, self.schema.naming) + for tbl in resource.compute_table_chain(items, meta) + ] def _compute_and_update_table( - self, resource: DltResource, table_name: str, items: TDataItems, meta: Any + self, resource: DltResource, root_table_name: str, items: TDataItems, meta: Any ) -> TDataItems: """ Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written """ - computed_table = self._compute_table(resource, items, meta) - # overwrite table name (if coming from meta) - computed_table["name"] = table_name - # get or compute contract - schema_contract = self._table_contracts.setdefault( - table_name, self.schema.resolve_contract_settings_for_table(table_name, computed_table) - ) - - # this is a new table so allow evolve once - if schema_contract["columns"] != "evolve" and self.schema.is_new_table(table_name): - computed_table["x-normalizer"] = {"evolve-columns-once": True} - existing_table = self.schema.tables.get(table_name, None) - if existing_table: - # TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely - diff_table = utils.diff_table(self.schema.name, existing_table, computed_table) - else: - diff_table = computed_table + computed_tables = self._compute_table(resource, items, meta) + + for computed_table in computed_tables: + # # overwrite table name (if coming from meta) + # computed_table["name"] = table_name # TODO: don't remove this + table_name = computed_table["name"] + # get or compute contract + schema_contract = self._table_contracts.setdefault( + table_name, + self.schema.resolve_contract_settings_for_table(table_name, computed_table), + ) - # apply contracts - diff_table, filters = self.schema.apply_schema_contract( - schema_contract, diff_table, data_item=items - ) + # this is a new table so allow evolve once + if schema_contract["columns"] != "evolve" and self.schema.is_new_table(table_name): + computed_table["x-normalizer"] = {"evolve-columns-once": True} + existing_table = self.schema.tables.get(table_name, None) + if existing_table: + # TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely + diff_table = utils.diff_table(self.schema.name, existing_table, computed_table) + else: + diff_table = computed_table - # merge with schema table - if diff_table: - # diff table identifiers already normalized - self.schema.update_table( - diff_table, normalize_identifiers=False, from_diff=bool(existing_table) + # apply contracts + diff_table, filters = self.schema.apply_schema_contract( + schema_contract, diff_table, data_item=items ) - # process filters - if filters: - for entity, name, mode in filters: - if entity == "tables": - self._filtered_tables.add(name) - elif entity == "columns": - filtered_columns = self._filtered_columns.setdefault(table_name, {}) - filtered_columns[name] = mode + # merge with schema table + if diff_table: + # diff table identifiers already normalized + self.schema.update_table( + diff_table, normalize_identifiers=False, from_diff=bool(existing_table) + ) + + # process filters + if filters: + for entity, name, mode in filters: + if entity == "tables": + self._filtered_tables.add(name) + elif entity == "columns": + filtered_columns = self._filtered_columns.setdefault(table_name, {}) + filtered_columns[name] = mode return items def _reset_contracts_cache(self) -> None: diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 358a3110d0..8ecae69cb6 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -1,4 +1,16 @@ -from typing import TypedDict, cast, Any, Optional, Dict, Sequence, Mapping, Union +from typing import ( + TypedDict, + cast, + Any, + Optional, + Dict, + Sequence, + Mapping, + Union, + Tuple, + Iterator, + List, +) from typing_extensions import Self from dlt.common import logger @@ -127,6 +139,31 @@ def make_hints( return new_template +# class DltResourceHintsDict(Dict[Tuple[str, ...], "DltResourceHints"]): +# # def __init__(self, initial_value: TResourceHintsBase) + +# # def __getitem__(self, key: Union[str, Sequence[str]]) -> "DltResourceHints": +# # """Get item at `key` is string or recursively if sequence""" +# # if isinstance(key, str): +# # return super().__getitem__(key) +# # else: +# # item = super().__getitem__(key[0]) +# # for k_ in key[1:]: +# # item = item.nested_hints[k_] +# # return item + +# def __setitem__(self, key: Union[str, Sequence[str]], value: Union["DltResourceHints", TResourceHintsBase]) -> None: +# """Sets resource hints at given `key` or create new instance from table template""" +# if isinstance(key, str): +# key = (key, ) +# else: +# key = tuple(key) +# if isinstance(value, DltResourceHints): +# return super().__setitem__(key, value) +# else: +# return super().__setitem__(key, DltResourceHints(value)) # type: ignore + + class DltResourceHintsDict(Dict[str, "DltResourceHints"]): # def __init__(self, initial_value: TResourceHintsBase) @@ -137,15 +174,28 @@ def __getitem__(self, key: Union[str, Sequence[str]]) -> "DltResourceHints": else: item = super().__getitem__(key[0]) for k_ in key[1:]: - item = item.nested_hints[k_] + item = item._nested_hints[k_] return item - def __setitem__(self, key: str, value: Union["DltResourceHints", TResourceHintsBase]) -> None: + def __setitem__( + self, key: Union[str, Sequence[str]], value: Union["DltResourceHints", TResourceHintsBase] + ) -> None: """Sets resource hints at given `key` or create new instance from table template""" - if isinstance(value, DltResourceHints): + if not isinstance(value, DltResourceHints): + value = DltResourceHints(value) # type: ignore[arg-type] + + if isinstance(key, str): return super().__setitem__(key, value) - else: - return super().__setitem__(key, DltResourceHints(value)) # type: ignore + + parent_dict = self + for part in key[:-1]: + if part not in parent_dict: + parent = parent_dict[part] = DltResourceHints(make_hints()) + else: + parent = parent_dict[part] + parent_dict = parent._nested_hints + + parent_dict[key[-1]] = value class DltResourceHints: @@ -154,7 +204,7 @@ def __init__(self, table_schema_template: TResourceHints = None): self._table_name_hint_fun: TFunHintTemplate[str] = None self._table_has_other_dynamic_hints: bool = False self._hints: TResourceHints = None - self._nested_hints: DltResourceHintsDict = None + self._nested_hints: DltResourceHintsDict = DltResourceHintsDict() """Hints for the resource""" self._hints_variants: Dict[str, TResourceHints] = {} """Hints for tables emitted from resources""" @@ -208,6 +258,17 @@ def table_format(self) -> TTableHintTemplate[TTableFormat]: def parent_table_name(self) -> TTableHintTemplate[str]: return None if self._hints is None else self._hints.get("parent") + def _walk_nested_hints( + self, path: List[str] = None + ) -> Iterator[Tuple[List[str], "DltResourceHints"]]: + """Walk nested hints recursively to generate a flat iterator of path and `DltResourceHints` instance pairs""" + if path is None: + path = [] + if path: + yield path, self + for key, nested_instance in self._nested_hints.items(): + yield from nested_instance._walk_nested_hints(path + [key]) + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation. `item` parameter is used to resolve table hints based on data. @@ -235,8 +296,8 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab for k, v in root_table_template.items() if k not in NATURAL_CALLABLES } # type: ignore - if "incremental" in table_template: - incremental = table_template["incremental"] + if "incremental" in root_table_template: + incremental = root_table_template["incremental"] if isinstance(incremental, Incremental) and incremental is not Incremental.EMPTY: resolved_template["incremental"] = incremental table_schema = self._create_table_schema(resolved_template, self.name) @@ -246,8 +307,27 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab doc=table_schema, path=f"new_table/{self.name}", ) + return table_schema + def compute_table_chain(self, item: TDataItem = None, meta: Any = None) -> List[TTableSchema]: + """Compute the table schema based on the current and all nested hints. + Nested hints are resolved recursively. + """ + root_table = self.compute_table_schema(item, meta) + root_table_name = root_table["name"] + result = [root_table] + for path, instance in self._walk_nested_hints(): + full_path = [root_table_name] + path + table_name = "__".join(full_path) # TODO: naming convention + table = instance.compute_table_schema(item, meta) + table["name"] = table_name + parent_name = "__".join(full_path[:-1]) + table["parent"] = parent_name + + result.append(table) + return result + def apply_hints( self, table_name: TTableHintTemplate[str] = None, @@ -263,6 +343,7 @@ def apply_hints( file_format: TTableHintTemplate[TFileFormat] = None, references: TTableHintTemplate[TTableReferenceParam] = None, create_table_variant: bool = False, + path: Sequence[str] = None, ) -> Self: """Creates or modifies existing table schema by setting provided hints. Accepts both static and dynamic hints based on data. @@ -282,6 +363,14 @@ def apply_hints( Returns: self for chaining """ + hints_instance: DltResourceHints + if path: + path = tuple(path) + hints_instance = self._nested_hints.get(path) + if not hints_instance: + hints_instance = DltResourceHints() + else: + hints_instance = self if create_table_variant: if not isinstance(table_name, str): raise ValueError( @@ -289,15 +378,15 @@ def apply_hints( " hints" ) # select hints variant - t = self._hints_variants.get(table_name, None) + t = hints_instance._hints_variants.get(table_name, None) if t is None: # use resource hints as starting point - if self._hints: - t = self._clone_hints(self._hints) + if hints_instance._hints: + t = hints_instance._clone_hints(self._hints) # but remove callables t = {n: h for n, h in t.items() if not callable(h)} # type: ignore[assignment] else: - t = self._hints + t = hints_instance._hints if t is None: # if there is no template yet, create and set a new one. @@ -315,18 +404,22 @@ def apply_hints( references, ) else: - t = self._clone_hints(t) + t = hints_instance._clone_hints(t) if table_name is not None: if table_name: t["table_name"] = table_name else: t.pop("table_name", None) if parent_table_name is not None: + if path: + raise ValueError("Parent table name cannot be set on nested tables") if parent_table_name: t["parent"] = parent_table_name else: t.pop("parent", None) if write_disposition: + if path: + raise ValueError("Write disposition cannot be set on nested tables") t["write_disposition"] = write_disposition if columns is not None: # keep original columns: i.e. in case it is a Pydantic model. @@ -374,6 +467,8 @@ def apply_hints( if schema_contract is not None: t["schema_contract"] = schema_contract if table_format is not None: + if path: + raise ValueError("Table format cannot be set on nested tables") if table_format: t["table_format"] = table_format else: @@ -396,9 +491,13 @@ def apply_hints( # set properties that can't be passed to make_hints if incremental is not None: + if path: + raise ValueError("Incremental cannot be set on nested tables") t["incremental"] = Incremental.ensure_instance(incremental) - self._set_hints(t, create_table_variant) + hints_instance._set_hints(t, create_table_variant) + if path: + self._nested_hints[path] = hints_instance return self def _set_hints( @@ -564,6 +663,7 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> """Creates table schema from resource hints and resource name. Resource hints are resolved (do not contain callables) and will be modified in place """ + resource_hints["name"] = resource_hints.pop("table_name") DltResourceHints._merge_keys(resource_hints) if "write_disposition" in resource_hints: if isinstance(resource_hints["write_disposition"], str): diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 366e6e1a88..eec72df728 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -12,6 +12,8 @@ Any, Optional, Mapping, + List, + Tuple, ) from typing_extensions import TypeVar, Self @@ -466,7 +468,9 @@ def _set_hints( if table_schema_template.get("validator") is not None: self.validator = table_schema_template["validator"] - def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: + def compute_table_schema( + self, item: TDataItem = None, meta: Any = None, path: Tuple[str, ...] = None + ) -> TTableSchema: incremental: Optional[Union[Incremental[Any], IncrementalResourceWrapper]] = ( self.incremental ) @@ -477,7 +481,6 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab self._hints["incremental"] = incremental table_schema = super().compute_table_schema(item, meta) - return table_schema def bind(self: TDltResourceImpl, *args: Any, **kwargs: Any) -> TDltResourceImpl: From 39ac90fd1e4ebcf7834d9792a6bd78a71a072f34 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 19 Dec 2024 12:38:23 -0500 Subject: [PATCH 3/8] Arrow fix --- dlt/extract/extractors.py | 89 ++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index bc5c0422ff..ce72905f13 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -411,56 +411,59 @@ def _write_item( def _compute_table( self, resource: DltResource, items: TDataItems, meta: Any - ) -> TPartialTableSchema: - arrow_table: TTableSchema = None + ) -> List[TPartialTableSchema]: + # arrow_table: TTableSchema = None + arrow_tables: Dict[str, TTableSchema] = {} # several arrow tables will update the pipeline schema and we want that earlier # arrow tables override the latter so the resultant schema is the same as if # they are sent separately for item in reversed(items): - computed_table = super()._compute_table(resource, item, Any) - # Merge the columns to include primary_key and other hints that may be set on the resource - if arrow_table: - utils.merge_table(self.schema.name, computed_table, arrow_table) - else: - arrow_table = copy(computed_table) - arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema) - - # Add load_id column if needed - dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID) - if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]: - # will be normalized line below - arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column() - - # normalize arrow table before merging - arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming) - # issue warnings when overriding computed with arrow - override_warn: bool = False - for col_name, column in arrow_table["columns"].items(): - if src_column := computed_table["columns"].get(col_name): - for hint_name, hint in column.items(): - if (src_hint := src_column.get(hint_name)) is not None: - if src_hint != hint: - override_warn = True - logger.info( - f"In resource: {resource.name}, when merging arrow schema on" - f" column {col_name}. The hint {hint_name} value" - f" {src_hint} defined in resource will overwrite arrow hint" - f" with value {hint}." - ) - if override_warn: - logger.warning( - f"In resource: {resource.name}, when merging arrow schema with dlt schema," - " several column hints were different. dlt schema hints were kept and arrow" - " schema and data were unmodified. It is up to destination to coerce the" - " differences when loading. Change log level to INFO for more details." + computed_tables = super()._compute_table(resource, item, Any) + for computed_table in computed_tables: + arrow_table = arrow_tables.get(computed_table['name']) + # Merge the columns to include primary_key and other hints that may be set on the resource + if arrow_table: + utils.merge_table(self.schema.name, computed_table, arrow_table) + else: + arrow_table = arrow_tables[computed_table['name']] = copy(computed_table) + arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema) + + # Add load_id column if needed + dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID) + if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]: + # will be normalized line below + arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column() + + # normalize arrow table before merging + arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming) + # issue warnings when overriding computed with arrow + override_warn: bool = False + for col_name, column in arrow_table["columns"].items(): + if src_column := computed_table["columns"].get(col_name): + for hint_name, hint in column.items(): + if (src_hint := src_column.get(hint_name)) is not None: + if src_hint != hint: + override_warn = True + logger.info( + f"In resource: {resource.name}, when merging arrow schema on" + f" column {col_name}. The hint {hint_name} value" + f" {src_hint} defined in resource will overwrite arrow hint" + f" with value {hint}." + ) + if override_warn: + logger.warning( + f"In resource: {resource.name}, when merging arrow schema with dlt schema," + " several column hints were different. dlt schema hints were kept and arrow" + " schema and data were unmodified. It is up to destination to coerce the" + " differences when loading. Change log level to INFO for more details." + ) + + utils.merge_columns( + arrow_table["columns"], computed_table["columns"], merge_columns=True ) - utils.merge_columns( - arrow_table["columns"], computed_table["columns"], merge_columns=True - ) - - return arrow_table + return list(arrow_tables.values()) def _compute_and_update_table( self, resource: DltResource, table_name: str, items: TDataItems, meta: Any From 34595446ba0da318a42dc15f3ceeeb98d3d0a58c Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 10:17:30 -0500 Subject: [PATCH 4/8] Handle TableNameMeta --- dlt/extract/extractors.py | 4 ++-- dlt/extract/hints.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index ce72905f13..3bf1bbc1c0 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -243,10 +243,10 @@ def _compute_and_update_table( Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written """ computed_tables = self._compute_table(resource, items, meta) + # overwrite root table name (if coming from meta) + computed_tables[0]["name"] = root_table_name for computed_table in computed_tables: - # # overwrite table name (if coming from meta) - # computed_table["name"] = table_name # TODO: don't remove this table_name = computed_table["name"] # get or compute contract schema_contract = self._table_contracts.setdefault( diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 8ecae69cb6..28d6078d1e 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -315,7 +315,10 @@ def compute_table_chain(self, item: TDataItem = None, meta: Any = None) -> List[ Nested hints are resolved recursively. """ root_table = self.compute_table_schema(item, meta) - root_table_name = root_table["name"] + if isinstance(meta, TableNameMeta): + root_table_name = meta.table_name + else: + root_table_name = root_table["name"] result = [root_table] for path, instance in self._walk_nested_hints(): full_path = [root_table_name] + path @@ -679,10 +682,10 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> @staticmethod def validate_dynamic_hints(template: TResourceHints) -> None: - table_name = template.get("name") + table_name = template.get("table_name") # if any of the hints is a function, then name must be as well. if any( - callable(v) for k, v in template.items() if k not in ["name", *NATURAL_CALLABLES] + callable(v) for k, v in template.items() if k not in ["table_name", *NATURAL_CALLABLES] ) and not callable(table_name): raise InconsistentTableTemplate( f"Table name {table_name} must be a function if any other table hint is a function" From 309c3d71ccd2a523003c0a92cfb752d8cd9c1335 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 10:30:34 -0500 Subject: [PATCH 5/8] Fix name hint --- dlt/extract/hints.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 28d6078d1e..b37bca3173 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -533,11 +533,11 @@ def _set_hints( self._hints_variants[table_name] = hints_template else: # if "name" is callable in the template, then the table schema requires data item to be inferred. - name_hint = hints_template.get("name") + name_hint = hints_template.get("table_name") self._table_name_hint_fun = name_hint if callable(name_hint) else None # check if any other hints in the table template should be inferred from data. self._table_has_other_dynamic_hints = any( - callable(v) for k, v in hints_template.items() if k != "name" + callable(v) for k, v in hints_template.items() if k != "table_name" ) self._hints = hints_template From 743816ff92a99bef3e7c6e593f73acc4fb5d66a6 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 11:03:50 -0500 Subject: [PATCH 6/8] Arrow fix, all tests/extract running --- dlt/extract/extractors.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 3bf1bbc1c0..f42067b4c4 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -426,7 +426,7 @@ def _compute_table( if arrow_table: utils.merge_table(self.schema.name, computed_table, arrow_table) else: - arrow_table = arrow_tables[computed_table['name']] = copy(computed_table) + arrow_table = copy(computed_table) arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema) # Add load_id column if needed @@ -462,6 +462,7 @@ def _compute_table( utils.merge_columns( arrow_table["columns"], computed_table["columns"], merge_columns=True ) + arrow_tables[computed_table["name"]] = arrow_table return list(arrow_tables.values()) From d0a83b29d466514ee147df2d4e0435e311457e3e Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 12:01:05 -0500 Subject: [PATCH 7/8] Nested hints tests, handle table name overrides --- dlt/extract/extractors.py | 11 ++-- dlt/extract/hints.py | 24 +++++--- tests/pipeline/test_table_hints.py | 98 ++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 tests/pipeline/test_table_hints.py diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index f42067b4c4..b10d740dbb 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -421,7 +421,7 @@ def _compute_table( for item in reversed(items): computed_tables = super()._compute_table(resource, item, Any) for computed_table in computed_tables: - arrow_table = arrow_tables.get(computed_table['name']) + arrow_table = arrow_tables.get(computed_table["name"]) # Merge the columns to include primary_key and other hints that may be set on the resource if arrow_table: utils.merge_table(self.schema.name, computed_table, arrow_table) @@ -431,7 +431,10 @@ def _compute_table( # Add load_id column if needed dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID) - if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]: + if ( + self._normalize_config.add_dlt_load_id + and dlt_load_id not in arrow_table["columns"] + ): # will be normalized line below arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column() @@ -446,8 +449,8 @@ def _compute_table( if src_hint != hint: override_warn = True logger.info( - f"In resource: {resource.name}, when merging arrow schema on" - f" column {col_name}. The hint {hint_name} value" + f"In resource: {resource.name}, when merging arrow schema" + f" on column {col_name}. The hint {hint_name} value" f" {src_hint} defined in resource will overwrite arrow hint" f" with value {hint}." ) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index b37bca3173..ea78d4688a 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -259,15 +259,15 @@ def parent_table_name(self) -> TTableHintTemplate[str]: return None if self._hints is None else self._hints.get("parent") def _walk_nested_hints( - self, path: List[str] = None - ) -> Iterator[Tuple[List[str], "DltResourceHints"]]: + self, path: Tuple[str] = None + ) -> Iterator[Tuple[Tuple[str, ...], "DltResourceHints"]]: """Walk nested hints recursively to generate a flat iterator of path and `DltResourceHints` instance pairs""" if path is None: - path = [] + path = tuple() if path: yield path, self for key, nested_instance in self._nested_hints.items(): - yield from nested_instance._walk_nested_hints(path + [key]) + yield from nested_instance._walk_nested_hints(path + (key,)) def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation. @@ -320,12 +320,16 @@ def compute_table_chain(self, item: TDataItem = None, meta: Any = None) -> List[ else: root_table_name = root_table["name"] result = [root_table] + path_to_name: Dict[Tuple[str, ...], str] = {(root_table_name,): root_table_name} for path, instance in self._walk_nested_hints(): - full_path = [root_table_name] + path - table_name = "__".join(full_path) # TODO: naming convention + full_path = (root_table_name,) + path table = instance.compute_table_schema(item, meta) - table["name"] = table_name - parent_name = "__".join(full_path[:-1]) + if not table.get("name"): + table["name"] = "__".join(full_path) # TODO: naming convention + path_to_name[full_path] = table["name"] + parent_name = path_to_name[full_path[:-1]] + + # parent_name = "__".join(full_path[:-1]) table["parent"] = parent_name result.append(table) @@ -369,7 +373,7 @@ def apply_hints( hints_instance: DltResourceHints if path: path = tuple(path) - hints_instance = self._nested_hints.get(path) + hints_instance = self._nested_hints.get(path) # type: ignore[call-overload] if not hints_instance: hints_instance = DltResourceHints() else: @@ -666,7 +670,7 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> """Creates table schema from resource hints and resource name. Resource hints are resolved (do not contain callables) and will be modified in place """ - resource_hints["name"] = resource_hints.pop("table_name") + resource_hints["name"] = resource_hints.pop("table_name") # type: ignore[typeddict-unknown-key] DltResourceHints._merge_keys(resource_hints) if "write_disposition" in resource_hints: if isinstance(resource_hints["write_disposition"], str): diff --git a/tests/pipeline/test_table_hints.py b/tests/pipeline/test_table_hints.py new file mode 100644 index 0000000000..ce0cda5d28 --- /dev/null +++ b/tests/pipeline/test_table_hints.py @@ -0,0 +1,98 @@ +import dlt + + +@dlt.resource +def nested_data(): + yield [ + { + "id": 1, + "a": [ + { + "a_id": "2", + "b": [{"b_id": "3"}], + } + ], + "c": [ + { + "c_id": "4", + "d": [{"d_id": "5"}], + } + ], + } + ] + + +def test_apply_hints_nested_hints_column_types() -> None: + nested_data_rs = nested_data() + + nested_data_rs.apply_hints( + path=["a", "b"], + columns=[ + { + "name": "b_id", + "data_type": "bigint", + }, + ], + ) + nested_data_rs.apply_hints( + path=["c"], + columns=[ + { + "name": "c_id", + "data_type": "double", + }, + ], + ) + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints", dev_mode=True, destination="duckdb" + ) + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["nested_data__a__b"]["columns"]["b_id"]["data_type"] == "bigint" + assert schema_tables["nested_data__c"]["columns"]["c_id"]["data_type"] == "double" + + assert schema_tables["nested_data__a__b"]["parent"] == "nested_data__a" + assert schema_tables["nested_data__c"]["parent"] == "nested_data" + + # Try changing the parent name + nested_data_rs.apply_hints(table_name="override_parent") + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints_2", dev_mode=True, destination="duckdb" + ) + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["override_parent__a__b"]["parent"] == "override_parent__a" + assert schema_tables["override_parent__c"]["parent"] == "override_parent" + assert schema_tables["override_parent__a__b"]["name"] == "override_parent__a__b" + assert schema_tables["override_parent__a__b"]["columns"]["b_id"]["data_type"] == "bigint" + + for key in schema_tables.keys(): + assert not key.startswith("nested_data") + + +def test_apply_hints_nested_hints_override_child_name(): + nested_data_rs = nested_data() + + # Override both levels of child tables + nested_data_rs.apply_hints(path=["a"], table_name="override_child_a") + nested_data_rs.apply_hints(path=["a", "b"], table_name="override_child_a_b") + + pipeline = dlt.pipeline( + pipeline_name="test_apply_hints_nested_hints_3", dev_mode=True, destination="duckdb" + ) + + pipeline.run(nested_data_rs) + + schema_tables = pipeline.default_schema.tables + + assert schema_tables["override_child_a_b"]["name"] == "override_child_a_b" + # Parent should match the overrid parent name + assert schema_tables["override_child_a_b"]["parent"] == "override_child_a" + + assert schema_tables["override_child_a"]["name"] == "override_child_a" From ee28048637006456dd5b55389ab381b886fb7e31 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Fri, 20 Dec 2024 12:02:31 -0500 Subject: [PATCH 8/8] lint --- dlt/extract/hints.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index ea78d4688a..9cf5d6db3b 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -263,11 +263,11 @@ def _walk_nested_hints( ) -> Iterator[Tuple[Tuple[str, ...], "DltResourceHints"]]: """Walk nested hints recursively to generate a flat iterator of path and `DltResourceHints` instance pairs""" if path is None: - path = tuple() + path = tuple() # type: ignore[assignment] if path: yield path, self for key, nested_instance in self._nested_hints.items(): - yield from nested_instance._walk_nested_hints(path + (key,)) + yield from nested_instance._walk_nested_hints(path + (key,)) # type: ignore[arg-type] def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation.