-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCD2 support #1168
SCD2 support #1168
Changes from 10 commits
720b115
115b4c9
37befbc
7726d98
30bb2e0
765d652
8f4d4ce
396ec59
c8d84d8
11748a6
e9c8f61
1f399bc
c8f4173
c99d612
0fa603b
c110aae
55df900
6b24378
4124d61
4236f20
0e7f8c0
93e7f45
36da1f2
0d6919a
2195ebf
52f0d7b
caa9ae7
64baf2d
8cb24af
6039f1c
dee8e08
e63ffe1
12bdf2b
c1614b9
a3f47fc
e1c53b8
c815792
900cf06
6ed8000
4fde3cc
7796b00
feca9cd
e5c78fd
d462a5b
6944828
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
Optional, | ||
Sequence, | ||
Set, | ||
Tuple, | ||
Type, | ||
TypedDict, | ||
NewType, | ||
|
@@ -64,7 +63,6 @@ | |
"dedup_sort", | ||
] | ||
"""Known hints of a column used to declare hint regexes.""" | ||
TWriteDisposition = Literal["skip", "append", "replace", "merge"] | ||
TTableFormat = Literal["iceberg", "parquet", "jsonl"] | ||
TTypeDetections = Literal[ | ||
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" | ||
|
@@ -86,7 +84,6 @@ | |
"root_key", | ||
] | ||
) | ||
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) | ||
|
||
|
||
class TColumnType(TypedDict, total=False): | ||
|
@@ -155,6 +152,26 @@ class NormalizerInfo(TypedDict, total=True): | |
new_table: bool | ||
|
||
|
||
TWriteDisposition = Literal["skip", "append", "replace", "merge"] | ||
TLoaderMergeStrategy = Literal["delete-insert", "scd2"] | ||
|
||
|
||
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) | ||
MERGE_STRATEGIES: Set[TLoaderMergeStrategy] = set(get_args(TLoaderMergeStrategy)) | ||
|
||
|
||
class TWriteDispositionDict(TypedDict): | ||
mode: TWriteDisposition | ||
|
||
|
||
class TMergeDispositionDict(TWriteDispositionDict, total=False): | ||
strategy: Optional[TLoaderMergeStrategy] | ||
validity_column_names: Optional[List[str]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it allow duplicates column names? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't, didn't think of that. Added validity column name checking in 30bb2e0. An exception is raised if a configured validity column name appears in the data. |
||
|
||
|
||
TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict] | ||
|
||
|
||
# TypedDict that defines properties of a table | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,33 @@ | ||
from typing import Any, Callable, List, Sequence, Tuple, cast, TypedDict, Optional | ||
from typing import Any, List, Sequence, Tuple, cast, TypedDict, Optional | ||
|
||
import yaml | ||
from dlt.common.logger import pretty_format_exception | ||
|
||
from dlt.common.schema.typing import TTableSchema, TSortOrder | ||
from dlt.common import pendulum | ||
from dlt.common.schema.typing import ( | ||
TTableSchema, | ||
TSortOrder, | ||
) | ||
from dlt.common.schema.utils import ( | ||
get_columns_names_with_prop, | ||
get_first_column_name_with_prop, | ||
get_dedup_sort_tuple, | ||
get_validity_column_names, | ||
) | ||
from dlt.common.storages.load_storage import ParsedLoadJobFileName | ||
from dlt.common.utils import uniq_id | ||
from dlt.common.destination.capabilities import DestinationCapabilitiesContext | ||
from dlt.destinations.exceptions import MergeDispositionException | ||
from dlt.destinations.job_impl import NewLoadJobImpl | ||
from dlt.destinations.sql_client import SqlClientBase | ||
from dlt.pipeline.current import load_package as current_load_package | ||
|
||
|
||
HIGH_TS = pendulum.datetime(9999, 12, 31) | ||
"""High timestamp used to indicate active records in `scd2` merge strategy.""" | ||
|
||
|
||
class SqlJobParams(TypedDict): | ||
class SqlJobParams(TypedDict, total=False): | ||
replace: Optional[bool] | ||
|
||
|
||
|
@@ -40,7 +50,7 @@ def from_table_chain( | |
|
||
The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list). | ||
""" | ||
params = cast(SqlJobParams, {**DEFAULTS, **(params or {})}) # type: ignore | ||
params = cast(SqlJobParams, {**DEFAULTS, **(params or {})}) | ||
top_table = table_chain[0] | ||
file_info = ParsedLoadJobFileName( | ||
top_table["name"], ParsedLoadJobFileName.new_file_id(), 0, "sql" | ||
|
@@ -138,25 +148,16 @@ class SqlMergeJob(SqlBaseJob): | |
failed_text: str = "Tried to generate a merge sql job for the following tables:" | ||
|
||
@classmethod | ||
def generate_sql( | ||
def generate_sql( # type: ignore[return] | ||
cls, | ||
table_chain: Sequence[TTableSchema], | ||
sql_client: SqlClientBase[Any], | ||
params: Optional[SqlJobParams] = None, | ||
) -> List[str]: | ||
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset. | ||
|
||
The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list). | ||
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated. | ||
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table. | ||
|
||
First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset. | ||
At the end we copy the data from the staging dataset into destination dataset. | ||
|
||
If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset. | ||
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains. | ||
""" | ||
return cls.gen_merge_sql(table_chain, sql_client) | ||
if table_chain[0].get("x-merge-strategy") == "delete-insert": | ||
return cls.gen_merge_sql(table_chain, sql_client) | ||
elif table_chain[0].get("x-merge-strategy") == "scd2": | ||
return cls.gen_scd2_sql(table_chain, sql_client) | ||
|
||
@classmethod | ||
def _gen_key_table_clauses( | ||
|
@@ -333,6 +334,18 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: | |
def gen_merge_sql( | ||
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any] | ||
) -> List[str]: | ||
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset. | ||
|
||
The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list). | ||
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated. | ||
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table. | ||
|
||
First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset. | ||
At the end we copy the data from the staging dataset into destination dataset. | ||
|
||
If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset. | ||
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains. | ||
""" | ||
sql: List[str] = [] | ||
root_table = table_chain[0] | ||
|
||
|
@@ -478,3 +491,60 @@ def gen_merge_sql( | |
|
||
sql.append(f"INSERT INTO {table_name}({col_str}) {select_sql};") | ||
return sql | ||
|
||
@classmethod | ||
def gen_scd2_sql( | ||
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any] | ||
) -> List[str]: | ||
"""Generates SQL statements for the `scd2` merge strategy. | ||
|
||
The root table can be inserted into and updated. | ||
Updates only take place when a record retires (because there is a new version | ||
or it is deleted) and only affect the "valid to" column. | ||
Child tables are insert-only. | ||
""" | ||
sql: List[str] = [] | ||
root_table = table_chain[0] | ||
root_table_name = sql_client.make_qualified_table_name(root_table["name"]) | ||
with sql_client.with_staging_dataset(staging=True): | ||
staging_root_table_name = sql_client.make_qualified_table_name(root_table["name"]) | ||
|
||
# get validity column names | ||
escape_id = sql_client.capabilities.escape_identifier | ||
from_, to = list(map(escape_id, get_validity_column_names(root_table))) | ||
|
||
# define values for validity columns | ||
boundary_ts = current_load_package()["state"]["created_at"] | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
active_record_ts = HIGH_TS.isoformat() | ||
|
||
# retire updated and deleted records | ||
sql.append(f""" | ||
UPDATE {root_table_name} SET {to} = '{boundary_ts}' | ||
WHERE NOT EXISTS ( | ||
SELECT s._dlt_id FROM {staging_root_table_name} AS s | ||
WHERE {root_table_name}._dlt_id = s._dlt_id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add x-row-hash hint that you can attach to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could handle three user input cases:
I suppose we would need to restrict There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
) AND {to} = '{active_record_ts}'; | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""") | ||
|
||
# insert new active records in root table | ||
columns = map(escape_id, list(root_table["columns"].keys())) | ||
col_str = ", ".join([c for c in columns if c not in (from_, to)]) | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sql.append(f""" | ||
INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) | ||
SELECT {col_str}, '{boundary_ts}' AS {from_}, '{active_record_ts}' AS {to} | ||
FROM {staging_root_table_name} AS s | ||
WHERE NOT EXISTS (SELECT s._dlt_id FROM {root_table_name} AS f WHERE f._dlt_id = s._dlt_id); | ||
""") | ||
|
||
# insert list elements for new active records in child tables | ||
for table in table_chain[1:]: | ||
table_name = sql_client.make_qualified_table_name(table["name"]) | ||
with sql_client.with_staging_dataset(staging=True): | ||
staging_table_name = sql_client.make_qualified_table_name(table["name"]) | ||
sql.append(f""" | ||
INSERT INTO {table_name} | ||
SELECT * | ||
FROM {staging_table_name} AS s | ||
WHERE NOT EXISTS (SELECT s._dlt_id FROM {table_name} AS f WHERE f._dlt_id = s._dlt_id); | ||
""") | ||
return sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you precompute a list of all "scd2" tables in
_reset
method? this part of schema remains constant during normalization.and this method is called for each normalized row. so it makes sense to optimize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solved with caching as you suggested on Slack: 6b24378