From 0b267a292b24f9d5ced28f88d189ae60529af6b7 Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 20 Aug 2024 18:05:40 +0530 Subject: [PATCH] adds failing test with overlapping incremental cursor values. --- tests/extract/test_incremental.py | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index a64e8bcdaf..cb95b39f2b 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -669,6 +669,47 @@ def some_data( assert s["last_value"] == 2 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_does_not_include_overlapping_records( + item_type: TestDataItemFormat, +) -> None: + @dlt.resource + def some_data( + invocation: int, + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include"), + ): + if invocation == 1: + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ], + ) + elif invocation == 2: + yield data_to_item_format( + item_type, + [ + {"id": 4, "created_at": 1}, + {"id": 5, "created_at": None}, + {"id": 6, "created_at": 3}, + ], + ) + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(1), destination="duckdb") + p.run(some_data(2), destination="duckdb") + + assert_query_data(p, "select id from some_data order by id", [1, 2, 3, 5, 6]) + assert_query_data(p, "select created_at from some_data order by created_at", [1, 2, 3, None, None]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 3 + + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( item_type: TestDataItemFormat,