Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: support lag for non-UTC datetime cursor fields #2170

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import datetime # noqa: I251
import re
import sys
from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa

from pendulum.parsing import (
Expand Down Expand Up @@ -125,6 +126,38 @@ def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime:
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")


def ensure_pendulum_datetime_non_utc(value: TAnyDateTime) -> pendulum.DateTime:
if isinstance(value, datetime.datetime):
ret = pendulum.instance(value)
return ret
elif isinstance(value, datetime.date):
return pendulum.datetime(value.year, value.month, value.day)
elif isinstance(value, (int, float, str)):
result = _datetime_from_ts_or_iso(value)
if isinstance(result, datetime.time):
raise ValueError(f"Cannot coerce {value} to a pendulum.DateTime object.")
if isinstance(result, pendulum.DateTime):
return result
return pendulum.datetime(result.year, result.month, result.day)
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")


def datatime_obj_to_str(
datatime: Union[datetime.datetime, datetime.date], datetime_format: str
) -> str:
if sys.version_info < (3, 12, 0) and "%:z" in datetime_format:
modified_format = datetime_format.replace("%:z", "%z")
datetime_str = datatime.strftime(modified_format)

timezone_part = datetime_str[-5:] if len(datetime_str) >= 5 else ""
if timezone_part.startswith(("-", "+")):
return f"{datetime_str[:-5]}{timezone_part[:3]}:{timezone_part[3:]}"

raise ValueError(f"Invalid timezone format in datetime string: {datetime_str}")

return datatime.strftime(datetime_format)


def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
"""Coerce a time value to a `pendulum.Time` object.

Expand Down Expand Up @@ -164,27 +197,27 @@ def detect_datetime_format(value: str) -> Optional[str]:
): "%Y-%m-%dT%H:%M:%S.%fZ", # UTC with fractional seconds
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone offset
): "%Y-%m-%dT%H:%M:%S%:z", # Positive timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone without colon
# Full datetime with fractional seconds and positive timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S.%f%z",
): "%Y-%m-%dT%H:%M:%S.%f%:z",
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S.%f%z", # Positive timezone without colon
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone offset
): "%Y-%m-%dT%H:%M:%S%:z", # Negative timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{4}$"
): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone without colon
# Full datetime with fractional seconds and negative timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S.%f%z",
): "%Y-%m-%dT%H:%M:%S.%f%:z",
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{4}$"
): "%Y-%m-%dT%H:%M:%S.%f%z", # Negative Timezone without colon
Expand Down
9 changes: 8 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime # noqa: I251
from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple

from typing_extensions import get_args

import inspect
Expand Down Expand Up @@ -560,8 +561,14 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
else:
rows = self._transform_item(transformer, rows)

# write back state
# ensure last_value maintains forward-only progression when lag is applied
if self.lag and (cached_last_value := self._cached_state.get("last_value")):
transformer.last_value = self.last_value_func(
(transformer.last_value, cached_last_value)
)
# writing back state
self._cached_state["last_value"] = transformer.last_value

if not transformer.deduplication_disabled:
# compute hashes for new last rows
unique_hashes = set(
Expand Down
11 changes: 8 additions & 3 deletions dlt/extract/incremental/lag.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import sys
from datetime import datetime, timedelta, date # noqa: I251
from typing import Union

from dlt.common import logger
from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format
from dlt.common.time import (
detect_datetime_format,
ensure_pendulum_datetime_non_utc,
datatime_obj_to_str,
)

from . import TCursorValue, LastValueFunc

Expand All @@ -17,12 +22,12 @@ def _apply_lag_to_value(
is_str = isinstance(value, str)
value_format = detect_datetime_format(value) if is_str else None
is_str_date = value_format in ("%Y%m%d", "%Y-%m-%d") if value_format else None
parsed_value = ensure_pendulum_datetime(value) if is_str else value
parsed_value = ensure_pendulum_datetime_non_utc(value) if is_str else value

if isinstance(parsed_value, (datetime, date)):
parsed_value = _apply_lag_to_datetime(lag, parsed_value, last_value_func, is_str_date) # type: ignore[assignment]
# go back to string or pass exact type
value = parsed_value.strftime(value_format) if value_format else parsed_value # type: ignore[assignment]
value = datatime_obj_to_str(parsed_value, value_format) if value_format else parsed_value # type: ignore[assignment]

elif isinstance(parsed_value, (int, float)):
value = _apply_lag_to_number(lag, parsed_value, last_value_func) # type: ignore[assignment]
Expand Down
32 changes: 28 additions & 4 deletions tests/common/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
datetime_to_timestamp,
datetime_to_timestamp_ms,
detect_datetime_format,
ensure_pendulum_datetime_non_utc,
)
from dlt.common.typing import TAnyDateTime
from dlt.common.time import datatime_obj_to_str


def test_timestamp_within() -> None:
Expand Down Expand Up @@ -132,21 +134,21 @@ def test_datetime_to_timestamp_helpers(
[
("2024-10-20T15:30:00Z", "%Y-%m-%dT%H:%M:%SZ"), # UTC 'Z'
("2024-10-20T15:30:00.123456Z", "%Y-%m-%dT%H:%M:%S.%fZ"), # UTC 'Z' with fractional seconds
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Positive timezone offset
("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset (no colon)
(
"2024-10-20T15:30:00.123456+02:00",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S.%f%:z",
), # Positive timezone offset with fractional seconds
(
"2024-10-20T15:30:00.123456+0200",
"%Y-%m-%dT%H:%M:%S.%f%z",
), # Positive timezone offset with fractional seconds (no colon)
("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset
("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Negative timezone offset
("2024-10-20T15:30:00-0200", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset (no colon)
(
"2024-10-20T15:30:00.123456-02:00",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S.%f%:z",
), # Negative timezone offset with fractional seconds
(
"2024-10-20T15:30:00.123456-0200",
Expand All @@ -170,6 +172,28 @@ def test_detect_datetime_format(value, expected_format) -> None:
assert ensure_pendulum_datetime(value) is not None


@pytest.mark.parametrize(
"datetime_str, datetime_format, expected_value",
[
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z", "2024-10-20T15:30:00+02:00"),
("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z", "2024-10-20T15:30:00+0200"),
(
"2024-10-20T15:30:00.123456-02:00",
"%Y-%m-%dT%H:%M:%S.%f%:z",
"2024-10-20T15:30:00.123456-02:00",
),
(
"2024-10-20T15:30:00.123456-0200",
"%Y-%m-%dT%H:%M:%S.%f%z",
"2024-10-20T15:30:00.123456-0200",
),
],
)
def test_datatime_obj_to_str(datetime_str, datetime_format, expected_value) -> None:
datetime = ensure_pendulum_datetime_non_utc(datetime_str)
assert datatime_obj_to_str(datetime, datetime_format) == expected_value


@pytest.mark.parametrize(
"value",
[
Expand Down
33 changes: 33 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,39 @@ def some_data(created_at=dlt.sources.incremental("created_at", initial_value)):
assert s["last_value"] == initial_value + timedelta(minutes=4)


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_incremental_transform_return_empty_rows_with_lag(item_type: TestDataItemFormat) -> None:
@dlt.resource
def some_data(
created_at=dlt.sources.incremental(
"created_at", initial_value="2024-11-01T08:00:00+08:00", lag=3600
)
):
yield from source_items

p = dlt.pipeline(pipeline_name=uniq_id())

first_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T12:00:00+08:00"}]
source_items = data_to_item_format(item_type, first_run_data)

p.extract(some_data())
s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]

assert s["last_value"] == "2024-11-01T12:00:00+08:00"

second_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T10:00:00+08:00"}]
source_items = data_to_item_format(item_type, second_run_data)

p.extract(some_data())
s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]

assert s["last_value"] == "2024-11-01T12:00:00+08:00"


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_descending_order_unique_hashes(item_type: TestDataItemFormat) -> None:
"""Resource returns items in descending order but using `max` last value function.
Expand Down
Loading