diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 236ab346a7..b0745b2761 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -315,7 +315,7 @@ def normalize_data_item( # determine merge strategy merge_strategy = None if table_name in self.schema.data_table_names(): - merge_strategy = self.schema.get_table(table_name).get("merge_strategy") + merge_strategy = self.schema.get_table(table_name).get("x-merge-strategy") yield from self._normalize_row( cast(TDataItemRowChild, row), {}, diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 8a6a0f22d2..8f2fba993e 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -155,6 +155,14 @@ class NormalizerInfo(TypedDict, total=True): new_table: bool +class TMergeConfig(TypedDict, total=False): + strategy: Optional[TLoaderMergeStrategy] + validity_column_names: Optional[List[str]] + + +DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"] +"""Default values for validity column names used in `scd2` merge strategy.""" + # TypedDict that defines properties of a table @@ -171,7 +179,6 @@ class TTableSchema(TypedDict, total=False): columns: TTableSchemaColumns resource: Optional[str] table_format: Optional[TTableFormat] - merge_strategy: Optional[TLoaderMergeStrategy] class TPartialTableSchema(TTableSchema): diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 02a0a6343b..7896fa2cc4 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -228,12 +228,7 @@ def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> L return [] def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: - merge_strategy = self.schema.get_table(table_chain[0]["name"]).get("merge_strategy") - return [ - SqlMergeJob.from_table_chain( - table_chain, self.sql_client, {"merge_strategy": merge_strategy} - ) - ] + return [SqlMergeJob.from_table_chain(table_chain, self.sql_client)] def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index e2c120bc8a..cdf2f61e51 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -4,7 +4,11 @@ from dlt.common.runtime.logger import pretty_format_exception from dlt.common import pendulum -from dlt.common.schema.typing import TTableSchema, TSortOrder, TLoaderMergeStrategy +from dlt.common.schema.typing import ( + TTableSchema, + TSortOrder, + TMergeConfig, +) from dlt.common.schema.utils import ( get_columns_names_with_prop, get_first_column_name_with_prop, @@ -25,7 +29,7 @@ class SqlJobParams(TypedDict, total=False): replace: Optional[bool] - merge_strategy: Optional[TLoaderMergeStrategy] + merge_config: Optional[TMergeConfig] DEFAULTS: SqlJobParams = {"replace": False} @@ -151,19 +155,7 @@ def generate_sql( sql_client: SqlClientBase[Any], params: Optional[SqlJobParams] = None, ) -> List[str]: - """Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset. - - The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list). - The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated. - The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table. - - First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset. - At the end we copy the data from the staging dataset into destination dataset. - - If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset. - If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains. - """ - if params["merge_strategy"] == "scd2": + if table_chain[0].get("x-merge-strategy") == "scd2": return cls.gen_scd2_sql(table_chain, sql_client) return cls.gen_merge_sql(table_chain, sql_client) @@ -342,6 +334,18 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: def gen_merge_sql( cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any] ) -> List[str]: + """Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset. + + The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list). + The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated. + The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table. + + First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset. + At the end we copy the data from the staging dataset into destination dataset. + + If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset. + If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains. + """ sql: List[str] = [] root_table = table_chain[0] @@ -492,7 +496,7 @@ def gen_merge_sql( def gen_scd2_sql( cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any] ) -> List[str]: - """Returns SQL statements for the `scd2` merge strategy. + """Generates SQL statements for the `scd2` merge strategy. The root table can be inserted into and updated. Updates only take place when a record retires (because there is a new version @@ -505,25 +509,30 @@ def gen_scd2_sql( with sql_client.with_staging_dataset(staging=True): staging_root_table_name = sql_client.make_qualified_table_name(root_table["name"]) + # get validity column names + escape_id = sql_client.capabilities.escape_identifier + from_ = escape_id(get_first_column_name_with_prop(root_table, "x-valid-from")) + to = escape_id(get_first_column_name_with_prop(root_table, "x-valid-to")) + # define values for validity columns boundary_ts = current_load_package()["state"]["created_at"] active_record_ts = HIGH_TS.isoformat() # retire updated and deleted records sql.append(f""" - UPDATE {root_table_name} SET valid_to = '{boundary_ts}' + UPDATE {root_table_name} SET {to} = '{boundary_ts}' WHERE NOT EXISTS ( SELECT s._dlt_id FROM {staging_root_table_name} AS s WHERE {root_table_name}._dlt_id = s._dlt_id - ) AND valid_to = '{active_record_ts}'; + ) AND {to} = '{active_record_ts}'; """) # insert new active records in root table - columns = list(root_table["columns"].keys()) - col_str = ", ".join([c for c in columns if c not in ("valid_from", "valid_to")]) + columns = map(escape_id, list(root_table["columns"].keys())) + col_str = ", ".join([c for c in columns if c not in (from_, to)]) sql.append(f""" - INSERT INTO {root_table_name} ({col_str}, valid_from, valid_to) - SELECT {col_str}, '{boundary_ts}' AS valid_from, '{active_record_ts}' AS valid_to + INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) + SELECT {col_str}, '{boundary_ts}' AS {from_}, '{active_record_ts}' AS {to} FROM {staging_root_table_name} AS s WHERE NOT EXISTS (SELECT s._dlt_id FROM {root_table_name} AS f WHERE f._dlt_id = s._dlt_id); """) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 0fe63a846b..6e14878699 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -33,7 +33,7 @@ from dlt.common.schema.typing import ( TColumnNames, TWriteDisposition, - TLoaderMergeStrategy, + TMergeConfig, TAnySchemaColumns, TSchemaContract, TTableFormat, @@ -297,7 +297,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, @@ -316,7 +316,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, @@ -335,7 +335,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, @@ -355,7 +355,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, @@ -373,7 +373,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, @@ -426,7 +426,7 @@ def resource( merge_key (str | Sequence[str]): A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. - merge_strategy (TLoaderMergeStrategy): The merge strategy to use. Only applies when the `merge` write disposition is used. If `None`, the standard strategy is used. + merge_config (TMergeConfig): A dictionary to customize behavior of the `merge` write disposition. Can for example be used to configure the `scd2` merge strategy. schema_contract (TSchemaContract, optional): Schema contract settings that will be applied to all resources of this source (if not overridden in the resource itself) table_format (Literal["iceberg"], optional): Defines the storage format of the table. Currently only "iceberg" is supported on Athena, other destinations ignore this hint. @@ -458,7 +458,7 @@ def make_resource( columns=columns, primary_key=primary_key, merge_key=merge_key, - merge_strategy=merge_strategy, + merge_config=merge_config, schema_contract=schema_contract, table_format=table_format, ) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index b155b5bd9a..a0ec73669c 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -11,13 +11,14 @@ TAnySchemaColumns, TTableFormat, TSchemaContract, + TMergeConfig, + DEFAULT_VALIDITY_COLUMN_NAMES, ) from dlt.common import logger from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_column, new_column, new_table from dlt.common.typing import TDataItem from dlt.common.utils import update_dict_nested from dlt.common.validation import validate_dict_ignoring_xkeys -from dlt.common.schema.typing import TLoaderMergeStrategy from dlt.extract.exceptions import ( DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, @@ -37,7 +38,7 @@ class TResourceHints(TypedDict, total=False): columns: TTableHintTemplate[TTableSchemaColumns] primary_key: TTableHintTemplate[TColumnNames] merge_key: TTableHintTemplate[TColumnNames] - merge_strategy: TLoaderMergeStrategy + merge_config: TMergeConfig incremental: Incremental[Any] schema_contract: TTableHintTemplate[TSchemaContract] table_format: TTableHintTemplate[TTableFormat] @@ -63,7 +64,7 @@ def make_hints( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, ) -> TResourceHints: @@ -99,8 +100,8 @@ def make_hints( new_template["primary_key"] = primary_key if merge_key is not None: new_template["merge_key"] = merge_key - if merge_strategy is not None: - new_template["merge_strategy"] = merge_strategy + if merge_config is not None: + new_template["merge_config"] = merge_config if validator: new_template["validator"] = validator DltResourceHints.validate_dynamic_hints(new_template) @@ -172,17 +173,11 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab if "name" not in table_template: table_template["name"] = self.name - # add columns for `scd2` merge strategy - if "merge_strategy" in table_template and table_template["merge_strategy"] == "scd2": - table_template["columns"] = { - "valid_from": {"name": "valid_from", "data_type": "timestamp"}, - "valid_to": {"name": "valid_to", "data_type": "timestamp"}, - } - # if table template present and has dynamic hints, the data item must be provided. if self._table_name_hint_fun and item is None: raise DataItemRequiredForDynamicTableHints(self.name) # resolve + merge_config = table_template.pop("merge_config", None) resolved_template: TResourceHints = { k: self._resolve_hint(item, v) for k, v in table_template.items() @@ -190,6 +185,7 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab } # type: ignore table_schema = self._merge_keys(resolved_template) table_schema["resource"] = self.name + self._resolve_merge_config(merge_config, table_schema) validate_dict_ignoring_xkeys( spec=TTableSchema, doc=table_schema, @@ -205,7 +201,7 @@ def apply_hints( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - merge_strategy: TLoaderMergeStrategy = None, + merge_config: TMergeConfig = None, incremental: Incremental[Any] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None, @@ -255,7 +251,7 @@ def apply_hints( columns, primary_key, merge_key, - merge_strategy, + merge_config, schema_contract, table_format, ) @@ -297,11 +293,11 @@ def apply_hints( t["merge_key"] = merge_key else: t.pop("merge_key", None) - if merge_strategy is not None: - if merge_strategy: - t["merge_strategy"] = merge_strategy + if merge_config is not None: + if merge_config: + t["merge_config"] = merge_config else: - t.pop("merge_strategy", None) + t.pop("merge_config", None) if schema_contract is not None: if schema_contract: t["schema_contract"] = schema_contract @@ -419,9 +415,26 @@ def _merge_keys(t_: TResourceHints) -> TPartialTableSchema: DltResourceHints._merge_key("primary_key", t_.pop("primary_key"), partial) # type: ignore if "merge_key" in t_: DltResourceHints._merge_key("merge_key", t_.pop("merge_key"), partial) # type: ignore - return partial + @staticmethod + def _resolve_merge_config( + merge_config: Optional[TMergeConfig], partial: TPartialTableSchema + ) -> None: + """Resolves `merge_config` into x-hints on `partial` table schema in place.""" + if merge_config is not None: + if "strategy" in merge_config: + partial["x-merge-strategy"] = merge_config["strategy"] # type: ignore[typeddict-unknown-key] + + # add columns for `scd2` merge strategy + if partial.get("x-merge-strategy") == "scd2": + if merge_config.get("validity_column_names") is None: + from_, to = DEFAULT_VALIDITY_COLUMN_NAMES + else: + from_, to = merge_config["validity_column_names"] + partial["columns"][from_] = {"name": from_, "data_type": "timestamp", "x-valid-from": True} # type: ignore[typeddict-unknown-key] + partial["columns"][to] = {"name": to, "data_type": "timestamp", "x-valid-to": True} # type: ignore[typeddict-unknown-key] + @staticmethod def validate_dynamic_hints(template: TResourceHints) -> None: table_name = template.get("name") diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index ee6b530f37..7ad7aec097 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -1317,14 +1317,20 @@ def empty_gen(): "primary_key": True, "merge_key": True, } - # test merge strategy hint - empty_r.apply_hints(merge_strategy="scd2") - assert empty_r._hints["merge_strategy"] == "scd2" + # test merge config hint + empty_r.apply_hints(merge_config={"strategy": "scd2", "validity_column_names": ["from", "to"]}) + assert empty_r._hints["merge_config"] == { + "strategy": "scd2", + "validity_column_names": ["from", "to"], + } table = empty_r.compute_table_schema() - assert "valid_from" in table["columns"] - assert "valid_to" in table["columns"] - empty_r.apply_hints(merge_strategy="") - assert "merge_strategy" not in empty_r._hints + assert table["x-merge-strategy"] == "scd2" + assert "from" in table["columns"] + assert "x-valid-from" in table["columns"]["from"] + assert "to" in table["columns"] + assert "x-valid-to" in table["columns"]["to"] + empty_r.apply_hints(merge_config="") + assert "merge_config" not in empty_r._hints def test_apply_dynamic_hints() -> None: diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 6ae460995d..487d280e74 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -6,7 +6,9 @@ import dlt from dlt.common.pipeline import LoadInfo +from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer +from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention from dlt.destinations.sql_jobs import HIGH_TS from tests.pipeline.utils import assert_load_info @@ -44,7 +46,9 @@ def get_table(pipeline: dlt.Pipeline, table_name: str, sort_column: str) -> List { k: strip_timezone(v) if isinstance(v, datetime) else v for k, v in r.items() - if not k.startswith("_dlt") or k == "_dlt_root_id" + if not k.startswith("_dlt") + or k in DEFAULT_VALIDITY_COLUMN_NAMES + or k == "_dlt_root_id" } for r in load_tables_to_dicts(pipeline, table_name)[table_name] ], @@ -53,18 +57,51 @@ def get_table(pipeline: dlt.Pipeline, table_name: str, sort_column: str) -> List @pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, supports_merge=True), - ids=lambda x: x.name, + "destination_config,simple,validity_column_names", + [ # test basic case for alle SQL destinations supporting merge + (dconf, True, None) + for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) + ] + + [ # test nested columns and validity column name configuration only for postgres + ( + dconf, + False, + ["from", "to"], + ) # "from" is a SQL keyword, so this also tests if columns are escaped + for dconf in destinations_configs(default_sql_configs=True, subset=["postgres"]) + ] + + [ + (dconf, False, ["ValidFrom", "ValidTo"]) + for dconf in destinations_configs(default_sql_configs=True, subset=["postgres"]) + ], + ids=lambda x: ( + x.name + if isinstance(x, DestinationTestConfiguration) + else (x[0] + "-" + x[1] if isinstance(x, list) else x) + ), ) -@pytest.mark.parametrize("simple", [True, False]) -def test_core_functionality(destination_config: DestinationTestConfiguration, simple: bool) -> None: +def test_core_functionality( + destination_config: DestinationTestConfiguration, + simple: bool, + validity_column_names: List[str], +) -> None: p = destination_config.setup_pipeline("abstract", full_refresh=True) - @dlt.resource(table_name="dim_test", write_disposition="merge", merge_strategy="scd2") + @dlt.resource( + table_name="dim_test", + write_disposition="merge", + merge_config={"strategy": "scd2", "validity_column_names": validity_column_names}, + ) def r(data): yield data + # get validity column names + from_, to = ( + DEFAULT_VALIDITY_COLUMN_NAMES + if validity_column_names is None + else map(SnakeCaseNamingConvention().normalize_identifier, validity_column_names) + ) + # load 1 — initial load dim_snap = [ {"nk": 1, "c1": "foo", "c2": "foo" if simple else {"nc1": "foo"}}, @@ -75,8 +112,8 @@ def r(data): assert_load_info(info) cname = "c2" if simple else "c2__nc1" assert get_table(p, "dim_test", cname) == [ - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 2, "c1": "bar", cname: "bar"}, - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_1, to: ACTIVE_TS, "nk": 2, "c1": "bar", cname: "bar"}, + {from_: ts_1, to: ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo"}, ] # load 2 — update a record @@ -88,9 +125,9 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", cname) == [ - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 2, "c1": "bar", cname: "bar"}, - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {"valid_from": ts_2, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, + {from_: ts_1, to: ACTIVE_TS, "nk": 2, "c1": "bar", cname: "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_2, to: ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, ] # load 3 — delete a record @@ -101,9 +138,9 @@ def r(data): ts_3 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", cname) == [ - {"valid_from": ts_1, "valid_to": ts_3, "nk": 2, "c1": "bar", cname: "bar"}, - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {"valid_from": ts_2, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_2, to: ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, ] # load 4 — insert a record @@ -115,10 +152,10 @@ def r(data): ts_4 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", cname) == [ - {"valid_from": ts_1, "valid_to": ts_3, "nk": 2, "c1": "bar", cname: "bar"}, - {"valid_from": ts_4, "valid_to": ACTIVE_TS, "nk": 3, "c1": "baz", cname: "baz"}, - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {"valid_from": ts_2, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, + {from_: ts_4, to: ACTIVE_TS, "nk": 3, "c1": "baz", cname: "baz"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_2, to: ACTIVE_TS, "nk": 1, "c1": "foo", cname: "foo_updated"}, ] @@ -131,10 +168,15 @@ def r(data): def test_child_table(destination_config: DestinationTestConfiguration, simple: bool) -> None: p = destination_config.setup_pipeline("abstract", full_refresh=True) - @dlt.resource(table_name="dim_test", write_disposition="merge", merge_strategy="scd2") + @dlt.resource( + table_name="dim_test", write_disposition="merge", merge_config={"strategy": "scd2"} + ) def r(data): yield data + # get validity column names + from_, to = DEFAULT_VALIDITY_COLUMN_NAMES + # load 1 — initial load dim_snap: List[Dict[str, Any]] = [ l1_1 := {"nk": 1, "c1": "foo", "c2": [1] if simple else [{"cc1": 1}]}, @@ -144,8 +186,8 @@ def r(data): ts_1 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 2, "c1": "bar"}, - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo"}, + {from_: ts_1, to: ACTIVE_TS, "nk": 2, "c1": "bar"}, + {from_: ts_1, to: ACTIVE_TS, "nk": 1, "c1": "foo"}, ] cname = "value" if simple else "cc1" assert get_table(p, "dim_test__c2", cname) == [ @@ -163,9 +205,9 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 2, "c1": "bar"}, - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo"}, # updated - {"valid_from": ts_2, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, # new + {from_: ts_1, to: ACTIVE_TS, "nk": 2, "c1": "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, # updated + {from_: ts_2, to: ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, # new ] assert get_table(p, "dim_test__c2", cname) == [ {"_dlt_root_id": h(l1_1), cname: 1}, @@ -187,10 +229,10 @@ def r(data): ts_3 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {"valid_from": ts_1, "valid_to": ACTIVE_TS, "nk": 2, "c1": "bar"}, - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo"}, - {"valid_from": ts_2, "valid_to": ts_3, "nk": 1, "c1": "foo_updated"}, # updated - {"valid_from": ts_3, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, # new + {from_: ts_1, to: ACTIVE_TS, "nk": 2, "c1": "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, + {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, # updated + {from_: ts_3, to: ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, # new ] exp_3 = [ {"_dlt_root_id": h(l1_1), cname: 1}, @@ -210,10 +252,10 @@ def r(data): ts_4 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {"valid_from": ts_1, "valid_to": ts_4, "nk": 2, "c1": "bar"}, # updated - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo"}, - {"valid_from": ts_2, "valid_to": ts_3, "nk": 1, "c1": "foo_updated"}, - {"valid_from": ts_3, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, + {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, # updated + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, + {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, + {from_: ts_3, to: ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, ] assert get_table(p, "dim_test__c2", cname) == exp_3 # deletes should not alter child tables @@ -226,11 +268,11 @@ def r(data): ts_5 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {"valid_from": ts_1, "valid_to": ts_4, "nk": 2, "c1": "bar"}, - {"valid_from": ts_5, "valid_to": ACTIVE_TS, "nk": 3, "c1": "baz"}, # new - {"valid_from": ts_1, "valid_to": ts_2, "nk": 1, "c1": "foo"}, - {"valid_from": ts_2, "valid_to": ts_3, "nk": 1, "c1": "foo_updated"}, - {"valid_from": ts_3, "valid_to": ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, + {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, + {from_: ts_5, to: ACTIVE_TS, "nk": 3, "c1": "baz"}, # new + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, + {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, + {from_: ts_3, to: ACTIVE_TS, "nk": 1, "c1": "foo_updated"}, ] assert get_table(p, "dim_test__c2", cname) == [ {"_dlt_root_id": h(l1_1), cname: 1}, @@ -252,7 +294,9 @@ def r(data): def test_grandchild_table(destination_config: DestinationTestConfiguration) -> None: p = destination_config.setup_pipeline("abstract", full_refresh=True) - @dlt.resource(table_name="dim_test", write_disposition="merge", merge_strategy="scd2") + @dlt.resource( + table_name="dim_test", write_disposition="merge", merge_config={"strategy": "scd2"} + ) def r(data): yield data diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 94683e4995..eb486c431c 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -171,7 +171,7 @@ def load_tables_to_dicts(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[D for table_name in table_names: table_rows = [] columns = p.default_schema.get_table_columns(table_name).keys() - query_columns = ",".join(columns) + query_columns = ",".join(map(p.sql_client().capabilities.escape_identifier, columns)) with p.sql_client() as c: f_q_table_name = c.make_qualified_table_name(table_name)