Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCD2 support #1168

Merged
merged 45 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
720b115
format examples
Mar 31, 2024
115b4c9
add core functionality for scd2 merge strategy
Mar 31, 2024
37befbc
make scd2 validity column names configurable
Apr 1, 2024
7726d98
make alias descriptive
Apr 2, 2024
30bb2e0
add validity column name conflict checking
Apr 2, 2024
765d652
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 828-scd2
Apr 7, 2024
8f4d4ce
extend write disposition with dictionary configuration option
Apr 7, 2024
396ec59
add default delete-insert merge strategy
Apr 7, 2024
c8d84d8
update write_disposition type hints
Apr 7, 2024
11748a6
extend tested destinations
Apr 7, 2024
e9c8f61
2nd time setup (#1202)
adrianbr Apr 9, 2024
1f399bc
remove obsolete deepcopy
Apr 9, 2024
c8f4173
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 828-scd2
Apr 9, 2024
c99d612
Merge pull request #1200 from dlt-hub/devel
rudolfix Apr 9, 2024
0fa603b
add scd2 docs
Apr 9, 2024
c110aae
add write_disposition existence condition
Apr 9, 2024
55df900
add nullability hints to validity columns
Apr 9, 2024
6b24378
cache functions to limit schema lookups
Apr 10, 2024
4124d61
add row_hash_column_name config option
Apr 10, 2024
4236f20
default to default merge strategy
Apr 11, 2024
0e7f8c0
replace hardcoded column name with variable to fix test
Apr 11, 2024
93e7f45
fix doc snippets
Apr 11, 2024
36da1f2
compares records without order and with caps timestamps precision in …
rudolfix Apr 13, 2024
0d6919a
defines create load id, stores package state typed, allows package st…
rudolfix Apr 13, 2024
2195ebf
creates new package to normalize from extracted package so state is c…
rudolfix Apr 13, 2024
52f0d7b
bans direct pendulum import
rudolfix Apr 13, 2024
caa9ae7
uses timestamps with properly reduced precision in scd2
rudolfix Apr 13, 2024
64baf2d
selects newest state by load_id, not created_at. this will not affect…
rudolfix Apr 13, 2024
8cb24af
adds formating datetime literal to escape
rudolfix Apr 13, 2024
6039f1c
renames x-row-hash to x-row-version
rudolfix Apr 13, 2024
dee8e08
corrects json and pendulum imports
rudolfix Apr 13, 2024
e63ffe1
uses unique column in scd2 sql generation
rudolfix Apr 13, 2024
12bdf2b
renames arrow items literal
rudolfix Apr 13, 2024
c1614b9
adds limitations to docs
rudolfix Apr 13, 2024
a3f47fc
passes only complete columns to arrow normalize
rudolfix Apr 13, 2024
e1c53b8
renames mode to disposition
rudolfix Apr 13, 2024
c815792
Merge branch 'master' into 828-scd2
rudolfix Apr 13, 2024
900cf06
Merge branch 'devel' into 828-scd2
rudolfix Apr 13, 2024
6ed8000
saves parquet with timestamp precision corresponding to the destinati…
rudolfix Apr 13, 2024
4fde3cc
adds transform that computes hashes of tables
rudolfix Apr 13, 2024
7796b00
tests arrow/pandas + scd2
rudolfix Apr 13, 2024
feca9cd
allows scd2 columns to be added to arrow items
rudolfix Apr 13, 2024
e5c78fd
various renames
rudolfix Apr 13, 2024
d462a5b
uses generic caps when writing parquet if no destination context
rudolfix Apr 14, 2024
6944828
disables coercing timestamps in parquet arrow writer
rudolfix Apr 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from dlt.common.data_types.typing import TDataType
from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from dlt.common import json
from dlt.common.normalizers.exceptions import InvalidJsonNormalizer
from dlt.common.normalizers.typing import TJSONNormalizer
from dlt.common.normalizers.utils import generate_dlt_id, DLT_ID_LENGTH_BYTES

from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, TColumnName, TSimpleRegex
from dlt.common.schema.utils import column_name_validator
from dlt.common.schema.typing import (
TTableSchema,
TColumnSchema,
TColumnName,
TSimpleRegex,
DLT_NAME_PREFIX,
)
from dlt.common.schema.utils import column_name_validator, get_validity_column_names
from dlt.common.schema.exceptions import ColumnNameConflictException
from dlt.common.utils import digest128, update_dict_nested
from dlt.common.normalizers.json import (
TNormalizedRowIterator,
Expand Down Expand Up @@ -127,6 +134,18 @@ def norm_row_dicts(dict_row: StrAny, __r_lvl: int, path: Tuple[str, ...] = ()) -
norm_row_dicts(dict_row, _r_lvl)
return cast(TDataItemRow, out_rec_row), out_rec_list

@staticmethod
def get_row_hash(row: Dict[str, Any]) -> str:
"""Returns hash of row.

Hash includes column names and values and is ordered by column name.
Excludes dlt system columns.
Can be used as deterministic row identifier.
"""
row_filtered = {k: v for k, v in row.items() if not k.startswith(DLT_NAME_PREFIX)}
row_str = json.dumps(row_filtered, sort_keys=True)
return digest128(row_str, DLT_ID_LENGTH_BYTES)

@staticmethod
def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) -> str:
# create deterministic unique id of the child row taking into account that all lists are ordered
Expand Down Expand Up @@ -220,10 +239,14 @@ def _normalize_row(
parent_row_id: Optional[str] = None,
pos: Optional[int] = None,
_r_lvl: int = 0,
row_hash: bool = False,
) -> TNormalizedRowIterator:
schema = self.schema
table = schema.naming.shorten_fragments(*parent_path, *ident_path)

# compute row hash and set as row id
if row_hash:
row_id = self.get_row_hash(dict_row) # type: ignore[arg-type]
dict_row["_dlt_id"] = row_id
# flatten current row and extract all lists to recur into
flattened_row, lists = self._flatten(table, dict_row, _r_lvl)
# always extend row
Expand Down Expand Up @@ -296,10 +319,18 @@ def normalize_data_item(
row = cast(TDataItemRowRoot, item)
# identify load id if loaded data must be processed after loading incrementally
row["_dlt_load_id"] = load_id
# determine if row hash should be used as dlt id
row_hash = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you precompute a list of all "scd2" tables in _reset method? this part of schema remains constant during normalization.
and this method is called for each normalized row. so it makes sense to optimize it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved with caching as you suggested on Slack: 6b24378

if table_name in self.schema.data_table_names():
table = self.schema.get_table(table_name)
if table.get("x-merge-strategy") == "scd2":
self._validate_validity_column_names(table, item)
row_hash = True
yield from self._normalize_row(
cast(TDataItemRowChild, row),
{},
(self.schema.naming.normalize_table_identifier(table_name),),
row_hash=row_hash,
)

@classmethod
Expand Down Expand Up @@ -333,3 +364,13 @@ def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConf
"./normalizers/json/config",
validator_f=column_name_validator(schema.naming),
)

@staticmethod
def _validate_validity_column_names(table: TTableSchema, item: TDataItem) -> None:
"""Raises exception if configured validity column name appears in data item."""
for validity_column_name in get_validity_column_names(table):
if validity_column_name in item.keys():
raise ColumnNameConflictException(
"Found column in data item with same name as validity column"
f' "{validity_column_name}".'
)
4 changes: 4 additions & 0 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,7 @@ class UnknownTableException(SchemaException):
def __init__(self, table_name: str) -> None:
self.table_name = table_name
super().__init__(f"Trying to access unknown table {table_name}.")


class ColumnNameConflictException(SchemaException):
pass
10 changes: 9 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Optional,
Sequence,
Set,
Tuple,
Type,
TypedDict,
NewType,
Expand Down Expand Up @@ -65,6 +64,7 @@
]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["scd2"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a default strategy which is "delete-insert". and we'll add one more merge to support MERGE sql statements #1129

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 396ec59

TTableFormat = Literal["iceberg", "parquet", "jsonl"]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
Expand Down Expand Up @@ -155,6 +155,14 @@ class NormalizerInfo(TypedDict, total=True):
new_table: bool


class TMergeConfig(TypedDict, total=False):
strategy: Optional[TLoaderMergeStrategy]
validity_column_names: Optional[List[str]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it allow duplicates column names?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, didn't think of that. Added validity column name checking in 30bb2e0. An exception is raised if a configured validity column name appears in the data.



DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""

# TypedDict that defines properties of a table


Expand Down
7 changes: 7 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,13 @@ def get_dedup_sort_tuple(
return (dedup_sort_col, dedup_sort_order)


def get_validity_column_names(table: TTableSchema) -> List[Optional[str]]:
return [
get_first_column_name_with_prop(table, "x-valid-from"),
get_first_column_name_with_prop(table, "x-valid-to"),
]


def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables:
aggregated_update: TSchemaTables = {}
for schema_update in schema_updates:
Expand Down
103 changes: 87 additions & 16 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
from typing import Any, Callable, List, Sequence, Tuple, cast, TypedDict, Optional
from typing import Any, List, Sequence, Tuple, cast, TypedDict, Optional

import yaml
from dlt.common.runtime.logger import pretty_format_exception

from dlt.common.schema.typing import TTableSchema, TSortOrder
from dlt.common import pendulum
from dlt.common.schema.typing import (
TTableSchema,
TSortOrder,
TMergeConfig,
)
from dlt.common.schema.utils import (
get_columns_names_with_prop,
get_first_column_name_with_prop,
get_dedup_sort_tuple,
get_validity_column_names,
)
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import uniq_id
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.destinations.exceptions import MergeDispositionException
from dlt.destinations.job_impl import NewLoadJobImpl
from dlt.destinations.sql_client import SqlClientBase
from dlt.pipeline.current import load_package as current_load_package


HIGH_TS = pendulum.datetime(9999, 12, 31)
"""High timestamp used to indicate active records in `scd2` merge strategy."""


class SqlJobParams(TypedDict):
class SqlJobParams(TypedDict, total=False):
replace: Optional[bool]
merge_config: Optional[TMergeConfig]


DEFAULTS: SqlJobParams = {"replace": False}
Expand All @@ -40,7 +52,7 @@ def from_table_chain(

The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list).
"""
params = cast(SqlJobParams, {**DEFAULTS, **(params or {})}) # type: ignore
params = cast(SqlJobParams, {**DEFAULTS, **(params or {})})
top_table = table_chain[0]
file_info = ParsedLoadJobFileName(
top_table["name"], ParsedLoadJobFileName.new_file_id(), 0, "sql"
Expand Down Expand Up @@ -144,18 +156,8 @@ def generate_sql(
sql_client: SqlClientBase[Any],
params: Optional[SqlJobParams] = None,
) -> List[str]:
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset.

The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list).
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated.
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table.

First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset.
At the end we copy the data from the staging dataset into destination dataset.

If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset.
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains.
"""
if table_chain[0].get("x-merge-strategy") == "scd2":
return cls.gen_scd2_sql(table_chain, sql_client)
return cls.gen_merge_sql(table_chain, sql_client)

@classmethod
Expand Down Expand Up @@ -333,6 +335,18 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
def gen_merge_sql(
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""Generates a list of sql statements that merge the data in staging dataset with the data in destination dataset.

The `table_chain` contains a list schemas of a tables with parent-child relationship, ordered by the ancestry (the root of the tree is first on the list).
The root table is merged using primary_key and merge_key hints which can be compound and be both specified. In that case the OR clause is generated.
The child tables are merged based on propagated `root_key` which is a type of foreign key but always leading to a root table.

First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset.
At the end we copy the data from the staging dataset into destination dataset.

If a hard_delete column is specified, records flagged as deleted will be excluded from the copy into the destination dataset.
If a dedup_sort column is specified in conjunction with a primary key, records will be sorted before deduplication, so the "latest" record remains.
"""
sql: List[str] = []
root_table = table_chain[0]

Expand Down Expand Up @@ -478,3 +492,60 @@ def gen_merge_sql(

sql.append(f"INSERT INTO {table_name}({col_str}) {select_sql};")
return sql

@classmethod
def gen_scd2_sql(
cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""Generates SQL statements for the `scd2` merge strategy.

The root table can be inserted into and updated.
Updates only take place when a record retires (because there is a new version
or it is deleted) and only affect the "valid to" column.
Child tables are insert-only.
"""
sql: List[str] = []
root_table = table_chain[0]
root_table_name = sql_client.make_qualified_table_name(root_table["name"])
with sql_client.with_staging_dataset(staging=True):
staging_root_table_name = sql_client.make_qualified_table_name(root_table["name"])

# get validity column names
escape_id = sql_client.capabilities.escape_identifier
from_, to = list(map(escape_id, get_validity_column_names(root_table)))

# define values for validity columns
boundary_ts = current_load_package()["state"]["created_at"]
active_record_ts = HIGH_TS.isoformat()
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

# retire updated and deleted records
sql.append(f"""
UPDATE {root_table_name} SET {to} = '{boundary_ts}'
WHERE NOT EXISTS (
SELECT s._dlt_id FROM {staging_root_table_name} AS s
WHERE {root_table_name}._dlt_id = s._dlt_id
) AND {to} = '{active_record_ts}';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add x-row-hash hint that you can attach to _dlt_id if it not present in the schema (same thing we do with root_key). why:

  1. no dependency on particular name when generating sql but on hints
  2. users can create their own hash during extraction and ie. just take into account 2-3 columns not all of them. also our default hash (via json dump) will be quite slow

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could handle three user input cases:

  1. merge_key = None (no merge key) ➜ assign x-row-hash to _dlt_id and generate row hash including all resource columns (the ones not generated by dlt)
  2. merge_key = col_x (simple merge key) ➜ assign x-row-hash to col_x and use random _dlt_id—this can be used as "Bring Your Own Hash / Surrogate Key"
  3. merge_key = [col_x, col_y, col_z] (composite merge key) ➜ assign x-row-hash to _dlt_id and generate row hash including only col_x, col_y, and col_z

I suppose we would need to restrict merge_key changes. If we don't, we would identify unchanged records as being updated because the way their hash was calculated changed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added row_hash_column_name configuration option in 4124d61. Translates to x-row-hash hint behind the scenes. With this, users can bring their own hash. As discussed on Slack, let's move other extensions involving merge_key to follow up PRs.

""")
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

# insert new active records in root table
columns = map(escape_id, list(root_table["columns"].keys()))
col_str = ", ".join([c for c in columns if c not in (from_, to)])
sql.append(f"""
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, '{boundary_ts}' AS {from_}, '{active_record_ts}' AS {to}
FROM {staging_root_table_name} AS s
WHERE NOT EXISTS (SELECT s._dlt_id FROM {root_table_name} AS f WHERE f._dlt_id = s._dlt_id);
""")

# insert list elements for new active records in child tables
for table in table_chain[1:]:
table_name = sql_client.make_qualified_table_name(table["name"])
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
sql.append(f"""
INSERT INTO {table_name}
SELECT *
FROM {staging_table_name} AS s
WHERE NOT EXISTS (SELECT s._dlt_id FROM {table_name} AS f WHERE f._dlt_id = s._dlt_id);
""")
return sql
10 changes: 9 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TTableSchemaColumns,
TWriteDisposition,
TMergeConfig,
TAnySchemaColumns,
TSchemaContract,
TTableFormat,
Expand Down Expand Up @@ -297,6 +297,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_config: TMergeConfig = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorritsandbrink @sh-rp maybe instead of doing this, we could extend write_disposition to
``
write_disposition: TTableHintTemplate[Union[TWriteDisposition, TMergeConfig, TReplaceConfig]] = None

exactly what we do with `schema_contract` below? then we can support short hand strings and full definitions. and still use the same parameter
in that case I'd rename TMergeConfig to TMergeDispositionConfig

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we also handle the replace strategies in there? (not in the pr but later)? i think it's a good idea

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended write_disposition in 8f4d4ce and c8d84d8

schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -315,6 +316,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -333,6 +335,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -352,6 +355,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand All @@ -369,6 +373,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
merge_config: TMergeConfig = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
Expand Down Expand Up @@ -421,6 +426,8 @@ def resource(
merge_key (str | Sequence[str]): A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.

merge_config (TMergeConfig): A dictionary to customize behavior of the `merge` write disposition. Can for example be used to configure the `scd2` merge strategy.

schema_contract (TSchemaContract, optional): Schema contract settings that will be applied to all resources of this source (if not overridden in the resource itself)
table_format (Literal["iceberg"], optional): Defines the storage format of the table. Currently only "iceberg" is supported on Athena, other destinations ignore this hint.

Expand Down Expand Up @@ -451,6 +458,7 @@ def make_resource(
columns=columns,
primary_key=primary_key,
merge_key=merge_key,
merge_config=merge_config,
schema_contract=schema_contract,
table_format=table_format,
)
Expand Down
Loading
Loading