From 5cfc5c8df3a38c14a52dd75534526d320fa342cb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Tue, 17 Sep 2024 15:55:03 +0400 Subject: [PATCH] finetune scd2 typing --- dlt/common/schema/typing.py | 9 +++++++-- dlt/extract/hints.py | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 855f26706a..e735190c92 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -231,8 +231,11 @@ class TWriteDispositionDict(TypedDict): disposition: TWriteDisposition -class TMergeDispositionDict(TWriteDispositionDict, total=False): +class TMergeDispositionDict(TWriteDispositionDict): strategy: Optional[TLoaderMergeStrategy] + + +class TScd2StrategyDict(TMergeDispositionDict, total=False): validity_column_names: Optional[List[str]] active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] @@ -241,7 +244,9 @@ class TMergeDispositionDict(TWriteDispositionDict, total=False): natural_key: Optional[str] -TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict] +TWriteDispositionConfig = Union[ + TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict +] class _TTableSchemaBase(TTableProcessingHints, total=False): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 22a7574796..f875c31634 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -12,6 +12,7 @@ TTableSchemaColumns, TWriteDispositionConfig, TMergeDispositionDict, + TScd2StrategyDict, TAnySchemaColumns, TTableFormat, TSchemaContract, @@ -454,6 +455,7 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: dict_["x-merge-strategy"] = merge_strategy if merge_strategy == "scd2": + md_dict = cast(TScd2StrategyDict, md_dict) if "boundary_timestamp" in md_dict: dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] if "retire_if_absent" in md_dict: @@ -532,6 +534,7 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf ) if wd.get("strategy") == "scd2": + wd = cast(TScd2StrategyDict, wd) for ts in ("active_record_timestamp", "boundary_timestamp"): if ( ts == "active_record_timestamp"