Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply hints for nested tables #2165

Draft
wants to merge 8 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class TTableReference(TypedDict):

class _TTableSchemaBase(TTableProcessingHints, total=False):
name: Optional[str]
path: Optional[Tuple[str, ...]]
"""Path of nested table"""
description: Optional[str]
schema_contract: Optional[TSchemaContract]
table_sealed: Optional[bool]
Expand Down
180 changes: 97 additions & 83 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
# convert to table meta if created table variant so item is assigned to this table
if meta.create_table_variant:
# name in hints meta must be a string, otherwise merge_hints would fail
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
meta = TableNameMeta(meta.hints["table_name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
Expand Down Expand Up @@ -227,56 +227,63 @@ def _write_to_static_table(
else:
self._write_item(table_name, resource.name, items)

def _compute_table(self, resource: DltResource, items: TDataItems, meta: Any) -> TTableSchema:
def _compute_table(
self, resource: DltResource, items: TDataItems, meta: Any
) -> List[TTableSchema]:
"""Computes a schema for a new or dynamic table and normalizes identifiers"""
return utils.normalize_table_identifiers(
resource.compute_table_schema(items, meta), self.schema.naming
)
return [
utils.normalize_table_identifiers(tbl, self.schema.naming)
for tbl in resource.compute_table_chain(items, meta)
]

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
self, resource: DltResource, root_table_name: str, items: TDataItems, meta: Any
) -> TDataItems:
"""
Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written
"""
computed_table = self._compute_table(resource, items, meta)
# overwrite table name (if coming from meta)
computed_table["name"] = table_name
# get or compute contract
schema_contract = self._table_contracts.setdefault(
table_name, self.schema.resolve_contract_settings_for_table(table_name, computed_table)
)

# this is a new table so allow evolve once
if schema_contract["columns"] != "evolve" and self.schema.is_new_table(table_name):
computed_table["x-normalizer"] = {"evolve-columns-once": True}
existing_table = self.schema.tables.get(table_name, None)
if existing_table:
# TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely
diff_table = utils.diff_table(self.schema.name, existing_table, computed_table)
else:
diff_table = computed_table
computed_tables = self._compute_table(resource, items, meta)
# overwrite root table name (if coming from meta)
computed_tables[0]["name"] = root_table_name

for computed_table in computed_tables:
table_name = computed_table["name"]
# get or compute contract
schema_contract = self._table_contracts.setdefault(
table_name,
self.schema.resolve_contract_settings_for_table(table_name, computed_table),
)

# apply contracts
diff_table, filters = self.schema.apply_schema_contract(
schema_contract, diff_table, data_item=items
)
# this is a new table so allow evolve once
if schema_contract["columns"] != "evolve" and self.schema.is_new_table(table_name):
computed_table["x-normalizer"] = {"evolve-columns-once": True}
existing_table = self.schema.tables.get(table_name, None)
if existing_table:
# TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely
diff_table = utils.diff_table(self.schema.name, existing_table, computed_table)
else:
diff_table = computed_table

# merge with schema table
if diff_table:
# diff table identifiers already normalized
self.schema.update_table(
diff_table, normalize_identifiers=False, from_diff=bool(existing_table)
# apply contracts
diff_table, filters = self.schema.apply_schema_contract(
schema_contract, diff_table, data_item=items
)

# process filters
if filters:
for entity, name, mode in filters:
if entity == "tables":
self._filtered_tables.add(name)
elif entity == "columns":
filtered_columns = self._filtered_columns.setdefault(table_name, {})
filtered_columns[name] = mode
# merge with schema table
if diff_table:
# diff table identifiers already normalized
self.schema.update_table(
diff_table, normalize_identifiers=False, from_diff=bool(existing_table)
)

# process filters
if filters:
for entity, name, mode in filters:
if entity == "tables":
self._filtered_tables.add(name)
elif entity == "columns":
filtered_columns = self._filtered_columns.setdefault(table_name, {})
filtered_columns[name] = mode
return items

def _reset_contracts_cache(self) -> None:
Expand Down Expand Up @@ -404,56 +411,63 @@ def _write_item(

def _compute_table(
self, resource: DltResource, items: TDataItems, meta: Any
) -> TPartialTableSchema:
arrow_table: TTableSchema = None
) -> List[TPartialTableSchema]:
# arrow_table: TTableSchema = None
arrow_tables: Dict[str, TTableSchema] = {}

# several arrow tables will update the pipeline schema and we want that earlier
# arrow tables override the latter so the resultant schema is the same as if
# they are sent separately
for item in reversed(items):
computed_table = super()._compute_table(resource, item, Any)
# Merge the columns to include primary_key and other hints that may be set on the resource
if arrow_table:
utils.merge_table(self.schema.name, computed_table, arrow_table)
else:
arrow_table = copy(computed_table)
arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema)

# Add load_id column if needed
dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID)
if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]:
# will be normalized line below
arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column()

# normalize arrow table before merging
arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming)
# issue warnings when overriding computed with arrow
override_warn: bool = False
for col_name, column in arrow_table["columns"].items():
if src_column := computed_table["columns"].get(col_name):
for hint_name, hint in column.items():
if (src_hint := src_column.get(hint_name)) is not None:
if src_hint != hint:
override_warn = True
logger.info(
f"In resource: {resource.name}, when merging arrow schema on"
f" column {col_name}. The hint {hint_name} value"
f" {src_hint} defined in resource will overwrite arrow hint"
f" with value {hint}."
)
if override_warn:
logger.warning(
f"In resource: {resource.name}, when merging arrow schema with dlt schema,"
" several column hints were different. dlt schema hints were kept and arrow"
" schema and data were unmodified. It is up to destination to coerce the"
" differences when loading. Change log level to INFO for more details."
computed_tables = super()._compute_table(resource, item, Any)
for computed_table in computed_tables:
arrow_table = arrow_tables.get(computed_table["name"])
# Merge the columns to include primary_key and other hints that may be set on the resource
if arrow_table:
utils.merge_table(self.schema.name, computed_table, arrow_table)
else:
arrow_table = copy(computed_table)
arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema)

# Add load_id column if needed
dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID)
if (
self._normalize_config.add_dlt_load_id
and dlt_load_id not in arrow_table["columns"]
):
# will be normalized line below
arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column()

# normalize arrow table before merging
arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming)
# issue warnings when overriding computed with arrow
override_warn: bool = False
for col_name, column in arrow_table["columns"].items():
if src_column := computed_table["columns"].get(col_name):
for hint_name, hint in column.items():
if (src_hint := src_column.get(hint_name)) is not None:
if src_hint != hint:
override_warn = True
logger.info(
f"In resource: {resource.name}, when merging arrow schema"
f" on column {col_name}. The hint {hint_name} value"
f" {src_hint} defined in resource will overwrite arrow hint"
f" with value {hint}."
)
if override_warn:
logger.warning(
f"In resource: {resource.name}, when merging arrow schema with dlt schema,"
" several column hints were different. dlt schema hints were kept and arrow"
" schema and data were unmodified. It is up to destination to coerce the"
" differences when loading. Change log level to INFO for more details."
)

utils.merge_columns(
arrow_table["columns"], computed_table["columns"], merge_columns=True
)
arrow_tables[computed_table["name"]] = arrow_table

utils.merge_columns(
arrow_table["columns"], computed_table["columns"], merge_columns=True
)

return arrow_table
return list(arrow_tables.values())

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
Expand Down
Loading
Loading