Skip to content

Commit

Permalink
adds failing test with overlapping incremental cursor values.
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Aug 20, 2024
1 parent 605461c commit 0b267a2
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,47 @@ def some_data(
assert s["last_value"] == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_path_none_does_not_include_overlapping_records(
item_type: TestDataItemFormat,
) -> None:
@dlt.resource
def some_data(
invocation: int,
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include"),
):
if invocation == 1:
yield data_to_item_format(
item_type,
[
{"id": 1, "created_at": None},
{"id": 2, "created_at": 1},
{"id": 3, "created_at": 2},
],
)
elif invocation == 2:
yield data_to_item_format(
item_type,
[
{"id": 4, "created_at": 1},
{"id": 5, "created_at": None},
{"id": 6, "created_at": 3},
],
)

p = dlt.pipeline(pipeline_name=uniq_id())
p.run(some_data(1), destination="duckdb")
p.run(some_data(2), destination="duckdb")

assert_query_data(p, "select id from some_data order by id", [1, 2, 3, 5, 6])
assert_query_data(p, "select created_at from some_data order by created_at", [1, 2, 3, None, None])

s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]
assert s["last_value"] == 3


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2(
item_type: TestDataItemFormat,
Expand Down

0 comments on commit 0b267a2

Please sign in to comment.