-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCD2 Experiment #923
SCD2 Experiment #923
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
|
||
@classmethod | ||
@classmethod | ||
def gen_scd2_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will probably work for most sql destinations, I am not quite sure about the code that copies column values into the staging dataset, but we can see in the tests.
dlt/destinations/job_client_impl.py
Outdated
@@ -224,7 +224,14 @@ def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> L | |||
return [] | |||
|
|||
def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: | |||
return [SqlMergeJob.from_table_chain(table_chain, self.sql_client)] | |||
now = pendulum.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say we need a cutoff date that is the same for the whole load. Probably we should take the timestamp when the loadpackage was created or something like this, i am just not sure if this is saved somewhere atm.
@@ -240,6 +240,15 @@ def coerce_row( | |||
updated_table_partial["columns"] = {} | |||
updated_table_partial["columns"][new_col_name] = new_col_def | |||
|
|||
# insert columns defs for scd2 (TODO: where to do this properly, maybe in a step after the normalization?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the right place to update the schema based on destination settings? maybe we should do this at the beginning of the load step? I am not sure.
@@ -148,6 +149,14 @@ def _extend_row(extend: DictStrAny, row: TDataItemRow) -> None: | |||
def _add_row_id( | |||
self, table: str, row: TDataItemRow, parent_row_id: str, pos: int, _r_lvl: int | |||
) -> str: | |||
# sometimes row id needs to be hash for now hardcode here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for scd2 it makes the most sense to have a row_hash in the row_id..
Description
Proof of concept for scd2 implementation. This is an implementation of scd2 as described here: https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row with a valid_from and valid_until column. In this prototype we treat scd2 as a merge_strategy rather than it's own write_disposition, given the similarities between scd2 and merge this might make sense, we could also introduce a new write disposition.
Loading with this strategy always assumes a full dataset sync from the source, this way we detect missing columns and mark them as not valid anymore.
Child tables get handled the same way as their parents. All tables are retained but gain validity columns. This part still needs to be hammered out.
Open questions and ideas