Skip to content

Commit

Permalink
make scd2 validity column names configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Apr 1, 2024
1 parent 115b4c9 commit 37befbc
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 104 deletions.
2 changes: 1 addition & 1 deletion dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def normalize_data_item(
# determine merge strategy
merge_strategy = None
if table_name in self.schema.data_table_names():
merge_strategy = self.schema.get_table(table_name).get("merge_strategy")
merge_strategy = self.schema.get_table(table_name).get("x-merge-strategy")
yield from self._normalize_row(
cast(TDataItemRowChild, row),
{},
Expand Down
9 changes: 8 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class NormalizerInfo(TypedDict, total=True):
new_table: bool


class TMergeConfig(TypedDict, total=False):
strategy: Optional[TLoaderMergeStrategy]
validity_column_names: Optional[List[str]]


DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""

# TypedDict that defines properties of a table


Expand All @@ -171,7 +179,6 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
merge_strategy: Optional[TLoaderMergeStrategy]


class TPartialTableSchema(TTableSchema):
Expand Down
7 changes: 1 addition & 6 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,7 @@ def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> L
return []

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
merge_strategy = self.schema.get_table(table_chain[0]["name"]).get("merge_strategy")
return [
SqlMergeJob.from_table_chain(
table_chain, self.sql_client, {"merge_strategy": merge_strategy}
)
]
return [SqlMergeJob.from_table_chain(table_chain, self.sql_client)]

def _create_replace_followup_jobs(
self, table_chain: Sequence[TTableSchema]
Expand Down
53 changes: 31 additions & 22 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from dlt.common.runtime.logger import pretty_format_exception

from dlt.common import pendulum
from dlt.common.schema.typing import TTableSchema, TSortOrder, TLoaderMergeStrategy
from dlt.common.schema.typing import (
TTableSchema,
TSortOrder,
TMergeConfig,
)
from dlt.common.schema.utils import (
get_columns_names_with_prop,
get_first_column_name_with_prop,
Expand All @@ -25,7 +29,7 @@

class SqlJobParams(TypedDict, total=False):
replace: Optional[bool]
merge_strategy: Optional[TLoaderMergeStrategy]
merge_config: Optional[TMergeConfig]


DEFAULTS: SqlJobParams = {"replace": False}
Expand Down Expand Up @@ -151,19 +155,7 @@ def generate_sql(
sql_client: SqlClientBase[Any],
params: Optional[SqlJobParams] = None,
) -> List[str]:
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset.
The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list).
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated.
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table.
First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset.
At the end we copy the data from the staging dataset into destination dataset.
If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset.
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains.
"""
if params["merge_strategy"] == "scd2":
if table_chain[0].get("x-merge-strategy") == "scd2":
return cls.gen_scd2_sql(table_chain, sql_client)
return cls.gen_merge_sql(table_chain, sql_client)

Expand Down Expand Up @@ -342,6 +334,18 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
def gen_merge_sql(
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset.
The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list).
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated.
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table.
First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset.
At the end we copy the data from the staging dataset into destination dataset.
If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset.
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains.
"""
sql: List[str] = []
root_table = table_chain[0]

Expand Down Expand Up @@ -492,7 +496,7 @@ def gen_merge_sql(
def gen_scd2_sql(
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""Returns SQL statements for the `scd2` merge strategy.
"""Generates SQL statements for the `scd2` merge strategy.
The root table can be inserted into and updated.
Updates only take place when a record retires (because there is a new version
Expand All @@ -505,25 +509,30 @@ def gen_scd2_sql(
with sql_client.with_staging_dataset(staging=True):
staging_root_table_name = sql_client.make_qualified_table_name(root_table["name"])

# get validity column names
escape_id = sql_client.capabilities.escape_identifier
from_ = escape_id(get_first_column_name_with_prop(root_table, "x-valid-from"))
to = escape_id(get_first_column_name_with_prop(root_table, "x-valid-to"))

# define values for validity columns
boundary_ts = current_load_package()["state"]["created_at"]
active_record_ts = HIGH_TS.isoformat()

# retire updated and deleted records
sql.append(f"""
UPDATE {root_table_name} SET valid_to = '{boundary_ts}'
UPDATE {root_table_name} SET {to} = '{boundary_ts}'
WHERE NOT EXISTS (
SELECT s._dlt_id FROM {staging_root_table_name} AS s
WHERE {root_table_name}._dlt_id = s._dlt_id
) AND valid_to = '{active_record_ts}';
) AND {to} = '{active_record_ts}';
""")

# insert new active records in root table
columns = list(root_table["columns"].keys())
col_str = ", ".join([c for c in columns if c not in ("valid_from", "valid_to")])
columns = map(escape_id, list(root_table["columns"].keys()))
col_str = ", ".join([c for c in columns if c not in (from_, to)])
sql.append(f"""
INSERT INTO {root_table_name} ({col_str}, valid_from, valid_to)
SELECT {col_str}, '{boundary_ts}' AS valid_from, '{active_record_ts}' AS valid_to
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, '{boundary_ts}' AS {from_}, '{active_record_ts}' AS {to}
FROM {staging_root_table_name} AS s
WHERE NOT EXISTS (SELECT s._dlt_id FROM {root_table_name} AS f WHERE f._dlt_id = s._dlt_id);
""")
Expand Down
16 changes: 8 additions & 8 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from dlt.common.schema.typing import (
TColumnNames,
TWriteDisposition,
TLoaderMergeStrategy,
TMergeConfig,
TAnySchemaColumns,
TSchemaContract,
TTableFormat,
Expand Down Expand Up @@ -297,7 +297,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -316,7 +316,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -335,7 +335,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -355,7 +355,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -373,7 +373,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand Down Expand Up @@ -426,7 +426,7 @@ def resource(
merge_key (str | Sequence[str]): A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
merge_strategy (TLoaderMergeStrategy): The merge strategy to use. Only applies when the `merge` write disposition is used. If `None`, the standard strategy is used.
merge_config (TMergeConfig): A dictionary to customize behavior of the `merge` write disposition. Can for example be used to configure the `scd2` merge strategy.
schema_contract (TSchemaContract, optional): Schema contract settings that will be applied to all resources of this source (if not overridden in the resource itself)
table_format (Literal["iceberg"], optional): Defines the storage format of the table. Currently only "iceberg" is supported on Athena, other destinations ignore this hint.
Expand Down Expand Up @@ -458,7 +458,7 @@ def make_resource(
columns=columns,
primary_key=primary_key,
merge_key=merge_key,
merge_strategy=merge_strategy,
merge_config=merge_config,
schema_contract=schema_contract,
table_format=table_format,
)
Expand Down
51 changes: 32 additions & 19 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
TAnySchemaColumns,
TTableFormat,
TSchemaContract,
TMergeConfig,
DEFAULT_VALIDITY_COLUMN_NAMES,
)
from dlt.common import logger
from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_column, new_column, new_table
from dlt.common.typing import TDataItem
from dlt.common.utils import update_dict_nested
from dlt.common.validation import validate_dict_ignoring_xkeys
from dlt.common.schema.typing import TLoaderMergeStrategy
from dlt.extract.exceptions import (
DataItemRequiredForDynamicTableHints,
InconsistentTableTemplate,
Expand All @@ -37,7 +38,7 @@ class TResourceHints(TypedDict, total=False):
columns: TTableHintTemplate[TTableSchemaColumns]
primary_key: TTableHintTemplate[TColumnNames]
merge_key: TTableHintTemplate[TColumnNames]
merge_strategy: TLoaderMergeStrategy
merge_config: TMergeConfig
incremental: Incremental[Any]
schema_contract: TTableHintTemplate[TSchemaContract]
table_format: TTableHintTemplate[TTableFormat]
Expand All @@ -63,7 +64,7 @@ def make_hints(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
) -> TResourceHints:
Expand Down Expand Up @@ -99,8 +100,8 @@ def make_hints(
new_template["primary_key"] = primary_key
if merge_key is not None:
new_template["merge_key"] = merge_key
if merge_strategy is not None:
new_template["merge_strategy"] = merge_strategy
if merge_config is not None:
new_template["merge_config"] = merge_config
if validator:
new_template["validator"] = validator
DltResourceHints.validate_dynamic_hints(new_template)
Expand Down Expand Up @@ -172,24 +173,19 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab
if "name" not in table_template:
table_template["name"] = self.name

# add columns for `scd2` merge strategy
if "merge_strategy" in table_template and table_template["merge_strategy"] == "scd2":
table_template["columns"] = {
"valid_from": {"name": "valid_from", "data_type": "timestamp"},
"valid_to": {"name": "valid_to", "data_type": "timestamp"},
}

# if table template present and has dynamic hints, the data item must be provided.
if self._table_name_hint_fun and item is None:
raise DataItemRequiredForDynamicTableHints(self.name)
# resolve
merge_config = table_template.pop("merge_config", None)
resolved_template: TResourceHints = {
k: self._resolve_hint(item, v)
for k, v in table_template.items()
if k not in NATURAL_CALLABLES
} # type: ignore
table_schema = self._merge_keys(resolved_template)
table_schema["resource"] = self.name
self._resolve_merge_config(merge_config, table_schema)
validate_dict_ignoring_xkeys(
spec=TTableSchema,
doc=table_schema,
Expand All @@ -205,7 +201,7 @@ def apply_hints(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_strategy: TLoaderMergeStrategy = None,
merge_config: TMergeConfig = None,
incremental: Incremental[Any] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None,
Expand Down Expand Up @@ -255,7 +251,7 @@ def apply_hints(
columns,
primary_key,
merge_key,
merge_strategy,
merge_config,
schema_contract,
table_format,
)
Expand Down Expand Up @@ -297,11 +293,11 @@ def apply_hints(
t["merge_key"] = merge_key
else:
t.pop("merge_key", None)
if merge_strategy is not None:
if merge_strategy:
t["merge_strategy"] = merge_strategy
if merge_config is not None:
if merge_config:
t["merge_config"] = merge_config
else:
t.pop("merge_strategy", None)
t.pop("merge_config", None)
if schema_contract is not None:
if schema_contract:
t["schema_contract"] = schema_contract
Expand Down Expand Up @@ -419,9 +415,26 @@ def _merge_keys(t_: TResourceHints) -> TPartialTableSchema:
DltResourceHints._merge_key("primary_key", t_.pop("primary_key"), partial) # type: ignore
if "merge_key" in t_:
DltResourceHints._merge_key("merge_key", t_.pop("merge_key"), partial) # type: ignore

return partial

@staticmethod
def _resolve_merge_config(
merge_config: Optional[TMergeConfig], partial: TPartialTableSchema
) -> None:
"""Resolves `merge_config` into x-hints on `partial` table schema in place."""
if merge_config is not None:
if "strategy" in merge_config:
partial["x-merge-strategy"] = merge_config["strategy"] # type: ignore[typeddict-unknown-key]

# add columns for `scd2` merge strategy
if partial.get("x-merge-strategy") == "scd2":
if merge_config.get("validity_column_names") is None:
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
else:
from_, to = merge_config["validity_column_names"]
partial["columns"][from_] = {"name": from_, "data_type": "timestamp", "x-valid-from": True} # type: ignore[typeddict-unknown-key]
partial["columns"][to] = {"name": to, "data_type": "timestamp", "x-valid-to": True} # type: ignore[typeddict-unknown-key]

@staticmethod
def validate_dynamic_hints(template: TResourceHints) -> None:
table_name = template.get("name")
Expand Down
20 changes: 13 additions & 7 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,14 +1317,20 @@ def empty_gen():
"primary_key": True,
"merge_key": True,
}
# test merge strategy hint
empty_r.apply_hints(merge_strategy="scd2")
assert empty_r._hints["merge_strategy"] == "scd2"
# test merge config hint
empty_r.apply_hints(merge_config={"strategy": "scd2", "validity_column_names": ["from", "to"]})
assert empty_r._hints["merge_config"] == {
"strategy": "scd2",
"validity_column_names": ["from", "to"],
}
table = empty_r.compute_table_schema()
assert "valid_from" in table["columns"]
assert "valid_to" in table["columns"]
empty_r.apply_hints(merge_strategy="")
assert "merge_strategy" not in empty_r._hints
assert table["x-merge-strategy"] == "scd2"
assert "from" in table["columns"]
assert "x-valid-from" in table["columns"]["from"]
assert "to" in table["columns"]
assert "x-valid-to" in table["columns"]["to"]
empty_r.apply_hints(merge_config="")
assert "merge_config" not in empty_r._hints


def test_apply_dynamic_hints() -> None:
Expand Down
Loading

0 comments on commit 37befbc

Please sign in to comment.