diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 13677e01b3..8376c9a8d6 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -755,17 +755,26 @@ def gen_scd2_sql( active_record_timestamp = get_active_record_timestamp(root_table) if active_record_timestamp is None: active_record_literal = "NULL" - is_active_clause = f"{to} IS NULL" + is_active = f"{to} IS NULL" else: # it's a datetime active_record_literal = format_datetime_literal( active_record_timestamp, caps.timestamp_precision ) - is_active_clause = f"{to} = {active_record_literal}" + is_active = f"{to} = {active_record_literal}" - # retire updated and deleted records + retire_if_absent = root_table.get("x-retire-if-absent", True) + if retire_if_absent: + dummy_clause = caps.escape_literal(True) + else: + nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key")) + nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" + + # retire records + # always retire updated records, retire deleted records only if `retire_if_absent` sql.append(f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} - WHERE {is_active_clause} + WHERE {is_active} + AND {dummy_clause if retire_if_absent else nk_present} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); """) @@ -776,7 +785,7 @@ def gen_scd2_sql( INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to} FROM {staging_root_table_name} AS s - WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause}); + WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active}); """) # insert list elements for new active records in nested tables diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 037ebbddf9..7a30812e74 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -452,10 +452,18 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: md_dict: TMergeDispositionDict = dict_.pop("write_disposition") if merge_strategy := md_dict.get("strategy"): dict_["x-merge-strategy"] = merge_strategy - if "boundary_timestamp" in md_dict: - dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] - # add columns for `scd2` merge strategy + if merge_strategy == "scd2": + if "boundary_timestamp" in md_dict: + dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] + if "retire_if_absent" in md_dict: + dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] # type: ignore[typeddict-item] + if "natural_key" in md_dict: + nk = md_dict["natural_key"] # type: ignore[typeddict-item] + if nk in dict_["columns"]: + dict_["columns"][nk]["x-natural-key"] = True + else: + dict_["columns"][nk] = {"name": nk, "x-natural-key": True} if md_dict.get("validity_column_names") is None: from_, to = DEFAULT_VALIDITY_COLUMN_NAMES else: @@ -523,13 +531,20 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" ) - for ts in ("active_record_timestamp", "boundary_timestamp"): - if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None: - continue # None is allowed for active_record_timestamp - if ts in wd: - try: - ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] - except Exception: - raise ValueError( - f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] - ) + if wd.get("strategy") == "scd2": + for ts in ("active_record_timestamp", "boundary_timestamp"): + if ( + ts == "active_record_timestamp" + and wd.get("active_record_timestamp") is None + ): + continue # None is allowed for active_record_timestamp + if ts in wd: + try: + ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] + except Exception: + raise ValueError( + f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] + ) + + if "retire_if_absent" in wd and not wd["retire_if_absent"] and "natural_key" not in wd: # type: ignore[typeddict-item] + raise ValueError("`natural_key` is required when `retire_if_absent=False`") diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 6a2a7c5466..3ac30e3664 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -715,6 +715,102 @@ def r(data): ) +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["duckdb"]), + ids=lambda x: x.name, +) +def test_retire_if_absent( + destination_config: DestinationTestConfiguration, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( # type: ignore[call-overload] + table_name="dim_test", + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + "natural_key": "nk", + }, + ) + def r(data): + yield data + + # load 1 — initial load + dim_snap = [ + {"nk": 1, "foo": "foo"}, + {"nk": 2, "foo": "foo"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + _, to = DEFAULT_VALIDITY_COLUMN_NAMES + # both records should be active (i.e. not retired) + assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + + # load 2 — natural key 2 is absent, natural key 1 is unchanged + dim_snap = [ + {"nk": 1, "foo": "foo"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should still be active + assert [row[to] for row in get_table(p, "dim_test")] == [None, None] + + # load 3 — natural key 2 is absent, natural key 1 has changed + dim_snap = [ + {"nk": 1, "foo": "bar"}, + ] + info = p.run(r(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 3 + boundary_ts = get_load_package_created_at(p, info) + # natural key 1 should now have two records (one retired, one active) + actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, to: boundary_ts}, {"nk": 1, to: None}, {"nk": 2, to: None}] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + # now test various configs + + with pytest.raises(ValueError): + # should raise because `natural_key` is required when `retire_if_absent=False` + r.apply_hints( + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + } + ) + + # `retire_if_absent=True` does not require `natural_key` + r.apply_hints( + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": True, + } + ) + assert r.compute_table_schema()["x-retire-if-absent"] + + # user-provided hints for `natural_key` column should be respected + r.apply_hints( + columns={"nk": {"x-foo": "foo"}}, + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "retire_if_absent": False, + "natural_key": "nk", + }, + ) + assert r.compute_table_schema()["columns"]["nk"] == { + "x-foo": "foo", + "name": "nk", + "x-natural-key": True, + } + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]),