Skip to content

Commit

Permalink
add scd2 retire_if_absent option
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Sep 16, 2024
1 parent a3a29ac commit aa62983
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 18 deletions.
19 changes: 14 additions & 5 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,17 +755,26 @@ def gen_scd2_sql(
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
is_active = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"
is_active = f"{to} = {active_record_literal}"

# retire updated and deleted records
retire_if_absent = root_table.get("x-retire-if-absent", True)
if retire_if_absent:
dummy_clause = caps.escape_literal(True)
else:
nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key"))
nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})"

# retire records
# always retire updated records, retire deleted records only if `retire_if_absent`
sql.append(f"""
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal}
WHERE {is_active_clause}
WHERE {is_active}
AND {dummy_clause if retire_if_absent else nk_present}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")

Expand All @@ -776,7 +785,7 @@ def gen_scd2_sql(
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause});
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active});
""")

# insert list elements for new active records in nested tables
Expand Down
41 changes: 28 additions & 13 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,18 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
md_dict: TMergeDispositionDict = dict_.pop("write_disposition")
if merge_strategy := md_dict.get("strategy"):
dict_["x-merge-strategy"] = merge_strategy
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
# add columns for `scd2` merge strategy

if merge_strategy == "scd2":
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if "retire_if_absent" in md_dict:
dict_["x-retire-if-absent"] = md_dict["retire_if_absent"] # type: ignore[typeddict-item]
if "natural_key" in md_dict:
nk = md_dict["natural_key"] # type: ignore[typeddict-item]
if nk in dict_["columns"]:
dict_["columns"][nk]["x-natural-key"] = True
else:
dict_["columns"][nk] = {"name": nk, "x-natural-key": True}
if md_dict.get("validity_column_names") is None:
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
else:
Expand Down Expand Up @@ -523,13 +531,20 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)

for ts in ("active_record_timestamp", "boundary_timestamp"):
if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None:
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)
if wd.get("strategy") == "scd2":
for ts in ("active_record_timestamp", "boundary_timestamp"):
if (
ts == "active_record_timestamp"
and wd.get("active_record_timestamp") is None
):
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)

if "retire_if_absent" in wd and not wd["retire_if_absent"] and "natural_key" not in wd: # type: ignore[typeddict-item]
raise ValueError("`natural_key` is required when `retire_if_absent=False`")
96 changes: 96 additions & 0 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,102 @@ def r(data):
)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_retire_if_absent(
destination_config: DestinationTestConfiguration,
) -> None:
p = destination_config.setup_pipeline("abstract", dev_mode=True)

@dlt.resource( # type: ignore[call-overload]
table_name="dim_test",
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
"natural_key": "nk",
},
)
def r(data):
yield data

# load 1 — initial load
dim_snap = [
{"nk": 1, "foo": "foo"},
{"nk": 2, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
# both records should be active (i.e. not retired)
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]

# load 2 — natural key 2 is absent, natural key 1 is unchanged
dim_snap = [
{"nk": 1, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
# both records should still be active
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]

# load 3 — natural key 2 is absent, natural key 1 has changed
dim_snap = [
{"nk": 1, "foo": "bar"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 3
boundary_ts = get_load_package_created_at(p, info)
# natural key 1 should now have two records (one retired, one active)
actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, to: boundary_ts}, {"nk": 1, to: None}, {"nk": 2, to: None}]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

# now test various configs

with pytest.raises(ValueError):
# should raise because `natural_key` is required when `retire_if_absent=False`
r.apply_hints(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
}
)

# `retire_if_absent=True` does not require `natural_key`
r.apply_hints(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": True,
}
)
assert r.compute_table_schema()["x-retire-if-absent"]

# user-provided hints for `natural_key` column should be respected
r.apply_hints(
columns={"nk": {"x-foo": "foo"}},
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
"natural_key": "nk",
},
)
assert r.compute_table_schema()["columns"]["nk"] == {
"x-foo": "foo",
"name": "nk",
"x-natural-key": True,
}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
Expand Down

0 comments on commit aa62983

Please sign in to comment.