Skip to content

Commit

Permalink
finetune scd2 typing
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Sep 17, 2024
1 parent 70d036d commit 5cfc5c8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
9 changes: 7 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,11 @@ class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition


class TMergeDispositionDict(TWriteDispositionDict, total=False):
class TMergeDispositionDict(TWriteDispositionDict):
strategy: Optional[TLoaderMergeStrategy]


class TScd2StrategyDict(TMergeDispositionDict, total=False):
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
Expand All @@ -241,7 +244,9 @@ class TMergeDispositionDict(TWriteDispositionDict, total=False):
natural_key: Optional[str]


TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
TWriteDispositionConfig = Union[
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
]


class _TTableSchemaBase(TTableProcessingHints, total=False):
Expand Down
3 changes: 3 additions & 0 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TTableSchemaColumns,
TWriteDispositionConfig,
TMergeDispositionDict,
TScd2StrategyDict,
TAnySchemaColumns,
TTableFormat,
TSchemaContract,
Expand Down Expand Up @@ -454,6 +455,7 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
dict_["x-merge-strategy"] = merge_strategy

if merge_strategy == "scd2":
md_dict = cast(TScd2StrategyDict, md_dict)
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if "retire_if_absent" in md_dict:
Expand Down Expand Up @@ -532,6 +534,7 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
)

if wd.get("strategy") == "scd2":
wd = cast(TScd2StrategyDict, wd)
for ts in ("active_record_timestamp", "boundary_timestamp"):
if (
ts == "active_record_timestamp"
Expand Down

0 comments on commit 5cfc5c8

Please sign in to comment.