From a2a572059773b37423455054df25f5256d84afe7 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Sat, 17 Feb 2024 02:58:17 +0000 Subject: [PATCH 01/17] Added info about file compression --- .../docs/dlt-ecosystem/destinations/filesystem.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 7b32132361..de18ff6c6d 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -185,6 +185,17 @@ bucket_url = "file:///absolute/path" # three / for absolute path - `replace` - all files that belong to such tables are deleted from dataset folder, and then the current set of files is added. - `merge` - falls back to `append` +## File Compression + +The filesystem destination in the dlt library uses gzip compression by default for efficiency, which may result in the files being stored in a compressed format. This format may not be easily readable as plain text or JSON Lines (jsonl) files. If you encounter files that seem unreadable, they may be compressed. + +To handle compressed files: + +- To decompress a gzip file, you can use tools like gunzip. This will convert the compressed file back to its original format, making it readable. +- To disable compression, you can modify the data_writer.disable_compression setting in your config.toml file. This can be useful if you want to access the files directly without needing to decompress them. + +For more details on managing file compression, please visit our documentation on performance optimization: [Disabling and Enabling File Compression](https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression). + ## Data loading All the files are stored in a single folder with the name of the dataset that you passed to the `run` or `load` methods of `pipeline`. In our example chess pipeline it is **chess_players_games_data**. From 31fb1fe78cf5d1a1cd953ff0b87c19c63c8fcb9f Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Sat, 17 Feb 2024 03:04:50 +0000 Subject: [PATCH 02/17] update --- docs/website/docs/dlt-ecosystem/destinations/filesystem.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index de18ff6c6d..a31b54b3a2 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -191,8 +191,9 @@ The filesystem destination in the dlt library uses gzip compression by default f To handle compressed files: +- To disable compression, you can modify the `data_writer.disable_compression` setting in your "config.toml" file. This can be useful if you want to access the files directly without needing to decompress them. + - To decompress a gzip file, you can use tools like gunzip. This will convert the compressed file back to its original format, making it readable. -- To disable compression, you can modify the data_writer.disable_compression setting in your config.toml file. This can be useful if you want to access the files directly without needing to decompress them. For more details on managing file compression, please visit our documentation on performance optimization: [Disabling and Enabling File Compression](https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression). From b47eebbc8d8b67aa0c6b791100395763f8ccd7ca Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Thu, 22 Feb 2024 01:53:54 +0000 Subject: [PATCH 03/17] Updated --- .../docs/dlt-ecosystem/destinations/filesystem.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index a31b54b3a2..177f17b373 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -187,13 +187,17 @@ bucket_url = "file:///absolute/path" # three / for absolute path ## File Compression -The filesystem destination in the dlt library uses gzip compression by default for efficiency, which may result in the files being stored in a compressed format. This format may not be easily readable as plain text or JSON Lines (jsonl) files. If you encounter files that seem unreadable, they may be compressed. +The filesystem destination in the dlt library uses `gzip` compression by default for efficiency, which may result in the files being stored in a compressed format. This format may not be easily readable as plain text or JSON Lines (`jsonl`) files. If you encounter files that seem unreadable, they may be compressed. To handle compressed files: -- To disable compression, you can modify the `data_writer.disable_compression` setting in your "config.toml" file. This can be useful if you want to access the files directly without needing to decompress them. +- To disable compression, you can modify the `data_writer.disable_compression` setting in your "config.toml" file. This can be useful if you want to access the files directly without needing to decompress them. For example: + ```toml + [normalize.data_writer] + disable_compression=false + ``` -- To decompress a gzip file, you can use tools like gunzip. This will convert the compressed file back to its original format, making it readable. +- To decompress a `gzip` file, you can use tools like `gunzip`. This will convert the compressed file back to its original format, making it readable. For more details on managing file compression, please visit our documentation on performance optimization: [Disabling and Enabling File Compression](https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression). From 48aae2974b8ab14036e119f42145d05af74a84a2 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Thu, 22 Feb 2024 12:47:57 +0000 Subject: [PATCH 04/17] Updated --- docs/website/docs/dlt-ecosystem/destinations/filesystem.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 177f17b373..c7ab8ed9d2 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -194,7 +194,7 @@ To handle compressed files: - To disable compression, you can modify the `data_writer.disable_compression` setting in your "config.toml" file. This can be useful if you want to access the files directly without needing to decompress them. For example: ```toml [normalize.data_writer] - disable_compression=false + disable_compression=true ``` - To decompress a `gzip` file, you can use tools like `gunzip`. This will convert the compressed file back to its original format, making it readable. From bd70a3e26de7d412094e0c0ff57c84856f554f80 Mon Sep 17 00:00:00 2001 From: adrianbr Date: Wed, 28 Feb 2024 16:49:46 +0100 Subject: [PATCH 05/17] add pyairbyte blog (#1025) --- .../blog/2024-02-28-what-is-pyairbyte.md | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 docs/website/blog/2024-02-28-what-is-pyairbyte.md diff --git a/docs/website/blog/2024-02-28-what-is-pyairbyte.md b/docs/website/blog/2024-02-28-what-is-pyairbyte.md new file mode 100644 index 0000000000..eb24569f10 --- /dev/null +++ b/docs/website/blog/2024-02-28-what-is-pyairbyte.md @@ -0,0 +1,40 @@ +--- +slug: what-is-pyairbyte +title: "PyAirbyte - what it is and what it’s not" +image: https://storage.googleapis.com/dlt-blog-images/pysquid.png +authors: + name: Adrian Brudaru + title: Open source Data Engineer + url: https://github.com/adrianbr + image_url: https://avatars.githubusercontent.com/u/5762770?v=4 +tags: [data observability, data pipeline observability] +--- + +## Intro + +Here at dltHub, we work on the python library for data ingestion. So when I heard from Airbyte that they are building a library, I was intrigued and decided to investigate. + +# What is PyAirbyte? + +PyAirbyte is an interesting Airbyte’s initiative - similar to the one that Meltano had undertook 3 years ago. It provides a convenient way to download and install Airbyte sources and run them locally storing the data in a cache dataset. Users are allowed to then read the data from this cache. + +A Python wrapper on the Airbyte source is quite nice and has a feeling close to [Alto]. The whole process of cloning/pip installing the repository, spawning a separate process to run Airbyte connector and read the data via UNIX pipe is hidden behind Pythonic interface. + +Note that this library is not an Airbyte replacement - the loaders of Airbyte and the library are very different. The library loader uses pandas.to_sql and sql alchemy and is not a replacement for Airbyte destinations that are available in Open Source Airbyte + +# Questions I had, answered + +- Can I run Airbyte sources with PyAirbyte? A subset of them. +- Can I use PyAirbyte to run a demo pipeline in a colab notebook? Yes. +- Would my colab demo have a compatible schema with Airbyte? No. +- Is PyAirbyte a replacement for Airbyte? No. +- Can I use PyAirbyte to develop or test during development Airbyte sources? No. +- Can I develop pipelines with PyAirbyte? no + +# In conclusion + +In wrapping up, it's clear that PyAirbyte is a neat little addition to the toolkit for those of us who enjoy tinkering with data in more casual or exploratory settings. I think this is an interesting initiative from Airbyte that will enable new usage patterns. + +### Want to discuss? + +[Join our slack community](https://dlthub.com/community) to take part in the conversation. \ No newline at end of file From cc66502f7ca35fd29bd28bce992614c8e4450e66 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Wed, 28 Feb 2024 17:16:52 +0100 Subject: [PATCH 06/17] Update zendesk code example test --- .../docs/examples/incremental_loading/code/zendesk-snippets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/examples/incremental_loading/code/zendesk-snippets.py b/docs/website/docs/examples/incremental_loading/code/zendesk-snippets.py index 49893fe74e..5ec3015741 100644 --- a/docs/website/docs/examples/incremental_loading/code/zendesk-snippets.py +++ b/docs/website/docs/examples/incremental_loading/code/zendesk-snippets.py @@ -140,4 +140,4 @@ def get_pages( # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts - assert row_counts["ticket_events"] == 16 + assert row_counts["ticket_events"] >= 17 From f21689b483444b5c6b365a3d1cc881251f3c6914 Mon Sep 17 00:00:00 2001 From: adrianbr Date: Thu, 29 Feb 2024 05:59:48 +0100 Subject: [PATCH 07/17] fix missing blog link (#1030) --- docs/website/blog/2024-02-28-what-is-pyairbyte.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/website/blog/2024-02-28-what-is-pyairbyte.md b/docs/website/blog/2024-02-28-what-is-pyairbyte.md index eb24569f10..ffacb1c2d5 100644 --- a/docs/website/blog/2024-02-28-what-is-pyairbyte.md +++ b/docs/website/blog/2024-02-28-what-is-pyairbyte.md @@ -18,7 +18,9 @@ Here at dltHub, we work on the python library for data ingestion. So when I hear PyAirbyte is an interesting Airbyte’s initiative - similar to the one that Meltano had undertook 3 years ago. It provides a convenient way to download and install Airbyte sources and run them locally storing the data in a cache dataset. Users are allowed to then read the data from this cache. -A Python wrapper on the Airbyte source is quite nice and has a feeling close to [Alto]. The whole process of cloning/pip installing the repository, spawning a separate process to run Airbyte connector and read the data via UNIX pipe is hidden behind Pythonic interface. + +A Python wrapper on the Airbyte source is quite nice and has a feeling close to [Alto](https://github.com/z3z1ma/alto). The whole process of cloning/pip installing the repository, spawning a separate process to run Airbyte connector and read the data via UNIX pipe is hidden behind Pythonic interface. + Note that this library is not an Airbyte replacement - the loaders of Airbyte and the library are very different. The library loader uses pandas.to_sql and sql alchemy and is not a replacement for Airbyte destinations that are available in Open Source Airbyte From cfb1f917832ef658c5730a7a30eaa01f95cab17d Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Tue, 5 Mar 2024 15:30:01 +0530 Subject: [PATCH 08/17] Docs/Updated for slack alerts. (#1042) * Updated for slack alerts. * Updated * Updated with production example * updated * Updated chess production * Updated --- .../docs/examples/chess_production/index.md | 9 ++++++ .../docs/running-in-production/alerting.md | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index d0f87df7d2..ea747288e5 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -177,6 +177,15 @@ def load_data_with_retry(pipeline, data): ``` +:::warning +To run this example you need to provide Slack incoming hook in `.dlt/secrets.toml`: +```python +[runtime] +slack_incoming_hook="https://hooks.slack.com/services/***" +``` +Read [Using Slack to send messages.](https://dlthub.com/docs/running-in-production/running#using-slack-to-send-messages) +::: + ### Run the pipeline diff --git a/docs/website/docs/running-in-production/alerting.md b/docs/website/docs/running-in-production/alerting.md index 65a9d05eae..2eb787359d 100644 --- a/docs/website/docs/running-in-production/alerting.md +++ b/docs/website/docs/running-in-production/alerting.md @@ -40,5 +40,33 @@ receiving rich information on executed pipelines, including encountered errors a ## Slack -Read [here](./running#using-slack-to-send-messages) about how to send -messages to Slack. +Alerts can be sent to a Slack channel via Slack's incoming webhook URL. The code snippet below demonstrates automated Slack notifications for database table updates using the `send_slack_message` function. + +```python +# Import the send_slack_message function from the dlt library +from dlt.common.runtime.slack import send_slack_message + +# Define the URL for your Slack webhook +hook = "https://hooks.slack.com/services/xxx/xxx/xxx" + +# Iterate over each package in the load_info object +for package in info.load_packages: + # Iterate over each table in the schema_update of the current package + for table_name, table in package.schema_update.items(): + # Iterate over each column in the current table + for column_name, column in table["columns"].items(): + # Send a message to the Slack channel with the table + # and column update information + send_slack_message( + hook, + message=( + f"\tTable updated: {table_name}: " + f"Column changed: {column_name}: " + f"{column['data_type']}" + ) + ) +``` +Refer to this [example](../../docs/examples/chess_production/) for a practical application of the method in a production environment. + +Similarly, Slack notifications can be extended to include information on pipeline execution times, loading durations, schema modifications, and more. For comprehensive details on configuring and sending messages to Slack, please read [here](./running#using-slack-to-send-messages). + From 9410bc4450290e794bc7011dfdd56c54bfb5af88 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 5 Mar 2024 11:47:54 +0100 Subject: [PATCH 09/17] change timing in round robin test (#1049) --- tests/extract/test_extract_pipe.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index d5be5808c0..577dca088c 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -73,7 +73,7 @@ def test_rotation_on_none() -> None: def source_gen1(): gen_1_started = time.time() yield None - while time.time() - gen_1_started < 0.6: + while time.time() - gen_1_started < 3: time.sleep(0.05) yield None yield 1 @@ -81,7 +81,7 @@ def source_gen1(): def source_gen2(): gen_2_started = time.time() yield None - while time.time() - gen_2_started < 0.2: + while time.time() - gen_2_started < 1: time.sleep(0.05) yield None yield 2 @@ -89,7 +89,7 @@ def source_gen2(): def source_gen3(): gen_3_started = time.time() yield None - while time.time() - gen_3_started < 0.4: + while time.time() - gen_3_started < 2: time.sleep(0.05) yield None yield 3 @@ -106,7 +106,7 @@ def get_pipes(): # items will be round robin, nested iterators are fully iterated and appear inline as soon as they are encountered assert [pi.item for pi in _l] == [2, 3, 1] # jobs should have been executed in parallel - assert time.time() - started < 0.8 + assert time.time() - started < 3.5 def test_add_step() -> None: From b15a71c65a513ce46f74fde71ff8880b685e8b1f Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 5 Mar 2024 13:06:03 +0100 Subject: [PATCH 10/17] fix link image --- .../blog/2023-06-21-open-api-spec-for-dlt-init.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md b/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md index 4d46b370b5..6edb0f44a7 100644 --- a/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md +++ b/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md @@ -13,17 +13,17 @@ Today we are releasing a proof of concept of the [`dlt init`](https://dlthub.com If you build APIs, for example with [FastAPI](https://fastapi.tiangolo.com/), you can, thanks to the [OpenAPI spec,](https://spec.openapis.org/oas/v3.1.0) automatically generate a [python client](https://pypi.org/project/openapi-python-client/0.6.0a4/) and give it to your users. Our demo takes this a step further and enables you to generate advanced `dlt` pipelines that, in essence, convert your API into a live dataset. -You can see how Marcin generates such a pipeline from the OpenAPI spec using the [Pokemon API](https://pokeapi.co/) in the Loom below. -[![marcin-demo](https://camo.githubusercontent.com/1aca1132999dde59bc5b274aeb4d01c79eab525941362491a534ddd8d1015dce/68747470733a2f2f63646e2e6c6f6f6d2e636f6d2f73657373696f6e732f7468756d626e61696c732f32383036623837336261316334653065613338326562336234666261663830382d776974682d706c61792e676966)](https://www.loom.com/share/2806b873ba1c4e0ea382eb3b4fbaf808?sid=501add8b-90a0-4734-9620-c6184d840995) - +You can see how Marcin generates such a pipeline from the OpenAPI spec using the [Pokemon API](https://pokeapi.co/) in the Loom below. +[![marcin-demo](https://storage.googleapis.com/dlt-blog-images/openapi_loom_old.png)](https://www.loom.com/share/2806b873ba1c4e0ea382eb3b4fbaf808?sid=501add8b-90a0-4734-9620-c6184d840995) + Part of our vision is that each API will come with a `dlt` pipeline - similar to how these days often it comes with a python client. We believe that very often API users do not really want to deal with endpoints, http requests, and JSON responses. They need live, evolving datasets that they can place anywhere they want so that it's accessible to any workflow. We believe that API builders will bundle `dlt` pipelines with their APIs only if such a process is hassle free. One answer to that is code generation and the reuse of information from the OpenAPI spec. -This release is a part of a bigger vision for `dlt` of a world centered around accessible data for modern data teams. In these new times code is becoming more disposable, but the data stays valuable. We eventually want to create an ecosystem where hundreds of thousands of pipelines will be created, shared, and deployed. Where datasets, reports, and analytics can be written and shared publicly and privately. [Code generation is automation on steroids](https://dlthub.com/product/#code-generation-is-automation-on-steroids) and we are going to be releasing many more features based on this principle. - -## Generating a pipeline for PokeAPI using OpenAPI spec - +This release is a part of a bigger vision for `dlt` of a world centered around accessible data for modern data teams. In these new times code is becoming more disposable, but the data stays valuable. We eventually want to create an ecosystem where hundreds of thousands of pipelines will be created, shared, and deployed. Where datasets, reports, and analytics can be written and shared publicly and privately. [Code generation is automation on steroids](https://dlthub.com/product/#code-generation-is-automation-on-steroids) and we are going to be releasing many more features based on this principle. + +## Generating a pipeline for PokeAPI using OpenAPI spec + In the embedded loom you saw Marcin pull data from the `dlt` pipeline created from the OpenAPI spec. The proof of concept already uses a few tricks and heuristics to generate useful code. Contrary to what you may think, PokeAPI is a complex one with a lot of linked data types and endpoints! - It created a resource for all endpoints that return lists of objects. From d9bae5ac631705ca6d37c7baad69fb6d8a937db8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Willi=20M=C3=BCller?= Date: Tue, 5 Mar 2024 16:26:06 +0100 Subject: [PATCH 11/17] adds test case where payload data contains PUA unicode characters (#1053) --- tests/pipeline/test_pipeline.py | 51 +++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e1f7397ef9..85fd7d9f34 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1663,3 +1663,54 @@ def api_fetch(page_num): load_info = pipeline.run(product()) assert_load_info(load_info) assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 + + +def test_run_with_pua_payload() -> None: + # prepare some data and complete load with run + os.environ["COMPLETED_PROB"] = "1.0" + pipeline_name = "pipe_" + uniq_id() + p = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb") + print(pipeline_name) + from dlt.common.json import PUA_START, PUA_CHARACTER_MAX + + def some_data(): + yield from [ + # text is only PUA + {"id": 1, "text": chr(PUA_START)}, + {"id": 2, "text": chr(PUA_START - 1)}, + {"id": 3, "text": chr(PUA_START + 1)}, + {"id": 4, "text": chr(PUA_START + PUA_CHARACTER_MAX + 1)}, + # PUA inside text + {"id": 5, "text": f"a{chr(PUA_START)}b"}, + {"id": 6, "text": f"a{chr(PUA_START - 1)}b"}, + {"id": 7, "text": f"a{chr(PUA_START + 1)}b"}, + # text starts with PUA + {"id": 8, "text": f"{chr(PUA_START)}a"}, + {"id": 9, "text": f"{chr(PUA_START - 1)}a"}, + {"id": 10, "text": f"{chr(PUA_START + 1)}a"}, + ] + + @dlt.source + def source(): + return dlt.resource(some_data(), name="pua_data") + + load_info = p.run(source()) + assert p.last_trace.last_normalize_info.row_counts["pua_data"] == 10 + + with p.sql_client() as client: + rows = client.execute_sql("SELECT text FROM pua_data ORDER BY id") + + values = [r[0] for r in rows] + assert values == [ + "\uf026", + "\uf025", + "\uf027", + "\uf02f", + "a\uf026b", + "a\uf025b", + "a\uf027b", + "\uf026a", + "\uf025a", + "\uf027a", + ] + assert len(load_info.loads_ids) == 1 From 4b9446c7d877ab940c53060f11d767b157dc78bb Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 5 Mar 2024 17:02:18 +0100 Subject: [PATCH 12/17] fix add_limit behavior in edge cases (#1052) * fix add_limit behavior in edge cases * update docs --- dlt/extract/incremental/__init__.py | 1 - dlt/extract/resource.py | 9 +++++++ docs/examples/connector_x_arrow/load_arrow.py | 2 ++ docs/examples/google_sheets/google_sheets.py | 5 +++- docs/examples/incremental_loading/zendesk.py | 8 +++--- docs/examples/nested_data/nested_data.py | 2 ++ .../pdf_to_weaviate/pdf_to_weaviate.py | 5 +++- docs/examples/qdrant_zendesk/qdrant.py | 9 ++++--- docs/examples/transformers/pokemon.py | 4 ++- docs/website/docs/general-usage/resource.md | 3 +++ tests/extract/test_sources.py | 26 +++++++++++++++++++ 11 files changed, 62 insertions(+), 12 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index d1a5a05c34..24495ccb19 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -7,7 +7,6 @@ from functools import wraps - import dlt from dlt.common.exceptions import MissingDependencyException from dlt.common import pendulum, logger diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index e5b83b853b..7159aff0b1 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -313,8 +313,17 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003 "DltResource": returns self """ + # make sure max_items is a number, to allow "None" as value for unlimited + if max_items is None: + max_items = -1 + def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" + + # zero items should produce empty generator + if max_items == 0: + return + count = 0 is_async_gen = False if inspect.isfunction(gen): diff --git a/docs/examples/connector_x_arrow/load_arrow.py b/docs/examples/connector_x_arrow/load_arrow.py index 06ca4e17b3..b3c654cef9 100644 --- a/docs/examples/connector_x_arrow/load_arrow.py +++ b/docs/examples/connector_x_arrow/load_arrow.py @@ -3,6 +3,7 @@ import dlt from dlt.sources.credentials import ConnectionStringCredentials + def read_sql_x( conn_str: ConnectionStringCredentials = dlt.secrets.value, query: str = dlt.config.value, @@ -14,6 +15,7 @@ def read_sql_x( protocol="binary", ) + def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 8a93df9970..1ba330e4ca 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,6 +9,7 @@ ) from dlt.common.typing import DictStrAny, StrAny + def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -16,6 +17,7 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service + @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] + if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) \ No newline at end of file + print(info) diff --git a/docs/examples/incremental_loading/zendesk.py b/docs/examples/incremental_loading/zendesk.py index 4b8597886a..6113f98793 100644 --- a/docs/examples/incremental_loading/zendesk.py +++ b/docs/examples/incremental_loading/zendesk.py @@ -6,12 +6,11 @@ from dlt.common.typing import TAnyDateTime from dlt.sources.helpers.requests import client + @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 - year=2000, month=1, day=1 - ), + start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 end_date: Optional[TAnyDateTime] = None, ): """ @@ -113,6 +112,7 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] + if __name__ == "__main__": # create dlt pipeline pipeline = dlt.pipeline( @@ -120,4 +120,4 @@ def get_pages( ) load_info = pipeline.run(zendesk_support()) - print(load_info) \ No newline at end of file + print(load_info) diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 3464448de6..7f85f0522e 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -13,6 +13,7 @@ CHUNK_SIZE = 10000 + # You can limit how deep dlt goes when generating child tables. # By default, the library will descend and generate child tables # for all nested lists, without a limit. @@ -81,6 +82,7 @@ def load_documents(self) -> Iterator[TDataItem]: while docs_slice := list(islice(cursor, CHUNK_SIZE)): yield map_nested_in_place(convert_mongo_objs, docs_slice) + def convert_mongo_objs(value: Any) -> Any: if isinstance(value, (ObjectId, Decimal128)): return str(value) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index 8f7833e7d7..e7f57853ed 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -4,6 +4,7 @@ from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader + @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -15,6 +16,7 @@ def list_files(folder_path: str): "mtime": os.path.getmtime(file_path), } + @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -28,6 +30,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item + pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" @@ -51,4 +54,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) diff --git a/docs/examples/qdrant_zendesk/qdrant.py b/docs/examples/qdrant_zendesk/qdrant.py index 300d8dc6ad..bd0cbafc99 100644 --- a/docs/examples/qdrant_zendesk/qdrant.py +++ b/docs/examples/qdrant_zendesk/qdrant.py @@ -10,13 +10,12 @@ from dlt.common.configuration.inject import with_config + # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 - year=2000, month=1, day=1 - ), + start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 end_date: Optional[TAnyDateTime] = None, ): """ @@ -80,6 +79,7 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: return None return ensure_pendulum_datetime(value) + # modify dates to return datetime objects instead def _fix_date(ticket): ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) @@ -87,6 +87,7 @@ def _fix_date(ticket): ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) return ticket + # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk def get_pages( url: str, @@ -127,6 +128,7 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] + if __name__ == "__main__": # create a pipeline with an appropriate name pipeline = dlt.pipeline( @@ -146,7 +148,6 @@ def get_pages( print(load_info) - # running the Qdrant client to connect to your Qdrant database @with_config(sections=("destination", "qdrant", "credentials")) diff --git a/docs/examples/transformers/pokemon.py b/docs/examples/transformers/pokemon.py index c17beff6a8..97b9a98b11 100644 --- a/docs/examples/transformers/pokemon.py +++ b/docs/examples/transformers/pokemon.py @@ -1,6 +1,7 @@ import dlt from dlt.sources.helpers import requests + @dlt.source(max_table_nesting=2) def source(pokemon_api_url: str): """""" @@ -46,6 +47,7 @@ def species(pokemon_details): return (pokemon_list | pokemon, pokemon_list | pokemon | species) + if __name__ == "__main__": # build duck db pipeline pipeline = dlt.pipeline( @@ -54,4 +56,4 @@ def species(pokemon_details): # the pokemon_list resource does not need to be loaded load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon")) - print(load_info) \ No newline at end of file + print(load_info) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 318f330849..9b8d45982d 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -362,6 +362,9 @@ assert list(r) == list(range(10)) > 💡 You cannot limit transformers. They should process all the data they receive fully to avoid > inconsistencies in generated datasets. +> 💡 If you are paremetrizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` +> to disable the limiting. You can also set the limit to `0` for the resource to not yield any items. + ### Set table name and adjust schema You can change the schema of a resource, be it standalone or as a part of a source. Look for method diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index a94cf680fa..48f966f831 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -2,6 +2,7 @@ from typing import Iterator import pytest +import asyncio import dlt from dlt.common.configuration.container import Container @@ -789,6 +790,31 @@ def test_limit_infinite_counter() -> None: assert list(r) == list(range(10)) +@pytest.mark.parametrize("limit", (None, -1, 0, 10)) +def test_limit_edge_cases(limit: int) -> None: + r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore + + @dlt.resource() + async def r_async(): + for i in range(20): + await asyncio.sleep(0.01) + yield i + + sync_list = list(r) + async_list = list(r_async().add_limit(limit)) + + # check the expected results + assert sync_list == async_list + if limit == 10: + assert sync_list == list(range(10)) + elif limit in [None, -1]: + assert sync_list == list(range(20)) + elif limit == 0: + assert sync_list == [] + else: + raise AssertionError(f"Unexpected limit: {limit}") + + def test_limit_source() -> None: def mul_c(item): yield from "A" * (item + 2) From 41614a66810ddd25e119d4c444098a44ddbf4d0e Mon Sep 17 00:00:00 2001 From: rudolfix Date: Tue, 5 Mar 2024 20:24:43 +0100 Subject: [PATCH 13/17] adds row_order to incremental (#1041) * helper to chunk iterators * wraps add_limit without evaluating gen * adds row order to incremental * adds method to close pipe early * auto updates docs * merges typing and items in extract * raises when pipe cannot be closed * uses original pipe when re-binding internal incremental, fixes late binding issue with parallelize * cleansup parallelize wrapper * updates incremental docs * updates incremental docs --- dlt/common/schema/typing.py | 2 +- dlt/common/typing.py | 1 + dlt/common/utils.py | 15 +- dlt/common/validation.py | 8 +- .../impl/bigquery/bigquery_adapter.py | 2 +- .../impl/synapse/synapse_adapter.py | 2 +- dlt/extract/concurrency.py | 3 +- dlt/extract/decorators.py | 2 +- dlt/extract/exceptions.py | 15 +- dlt/extract/extractors.py | 2 +- dlt/extract/hints.py | 2 +- dlt/extract/incremental/__init__.py | 77 ++++-- dlt/extract/incremental/transform.py | 19 +- dlt/extract/items.py | 211 ++++++++++++++- dlt/extract/pipe.py | 29 +-- dlt/extract/pipe_iterator.py | 4 +- dlt/extract/resource.py | 15 +- dlt/extract/source.py | 2 +- dlt/extract/typing.py | 169 ------------ dlt/extract/utils.py | 37 ++- dlt/extract/validation.py | 2 +- dlt/helpers/airflow_helper.py | 60 ++--- dlt/sources/helpers/transform.py | 2 +- docs/examples/transformers/pokemon.py | 6 +- .../verified-sources/sql_database.md | 57 +++-- .../docs/examples/chess_production/index.md | 2 +- .../docs/examples/transformers/index.md | 2 +- .../docs/general-usage/incremental-loading.md | 241 ++++++++++-------- docs/website/docs/intro.md | 2 +- docs/website/docs/reference/performance.md | 16 +- tests/common/test_validation.py | 18 +- tests/extract/test_decorators.py | 21 +- tests/extract/test_extract_pipe.py | 19 +- tests/extract/test_incremental.py | 124 ++++++++- tests/extract/test_sources.py | 25 +- tests/extract/test_validation.py | 2 +- tests/extract/utils.py | 2 +- tests/sources/helpers/test_requests.py | 23 +- tests/utils.py | 19 ++ 39 files changed, 797 insertions(+), 463 deletions(-) delete mode 100644 dlt/extract/typing.py diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 9cbd7266f2..ec60e4c365 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -18,6 +18,7 @@ from dlt.common.data_types import TDataType from dlt.common.normalizers.typing import TNormalizersConfig +from dlt.common.typing import TSortOrder try: from pydantic import BaseModel as _PydanticBaseModel @@ -71,7 +72,6 @@ TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] TColumnNames = Union[str, Sequence[str]] """A string representing a column name or a list of""" -TSortOrder = Literal["asc", "desc"] COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp)) COLUMN_HINTS: Set[TColumnHint] = set( diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 7c20b0df43..05720fe7d9 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -93,6 +93,7 @@ TVariantRV = Tuple[str, Any] VARIANT_FIELD_FORMAT = "v_%s" TFileOrPath = Union[str, os.PathLike, IO[Any]] +TSortOrder = Literal["asc", "desc"] @runtime_checkable diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 9beb4e48bf..4ddde87758 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -43,9 +43,18 @@ RowCounts = Dict[str, int] -def chunks(seq: Sequence[T], n: int) -> Iterator[Sequence[T]]: - for i in range(0, len(seq), n): - yield seq[i : i + n] +def chunks(iterable: Iterable[T], n: int) -> Iterator[Sequence[T]]: + it = iter(iterable) + while True: + chunk = list() + try: + for _ in range(n): + chunk.append(next(it)) + except StopIteration: + if chunk: + yield chunk + break + yield chunk def uniq_id(len_: int = 16) -> str: diff --git a/dlt/common/validation.py b/dlt/common/validation.py index 134708f9dd..6bf1356aeb 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -90,7 +90,11 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None: has_passed = True if not has_passed: type_names = [ - str(get_args(ut)) if is_literal_type(ut) else ut.__name__ + ( + str(get_args(ut)) + if is_literal_type(ut) + else getattr(ut, "__name__", str(ut)) + ) for ut in union_types ] raise DictValidationException( @@ -162,7 +166,7 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None: if not validator_f(path, pk, pv, t): # TODO: when Python 3.9 and earlier support is # dropped, just __name__ can be used - type_name = getattr(t, "__name__", t.__class__) + type_name = getattr(t, "__name__", str(t)) raise DictValidationException( f"In {path}: field {pk} has expected type {type_name} which lacks validator", path, diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 26ca4a3883..1d630e9802 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -9,7 +9,7 @@ ) from dlt.destinations.utils import ensure_resource from dlt.extract import DltResource -from dlt.extract.typing import TTableHintTemplate +from dlt.extract.items import TTableHintTemplate PARTITION_HINT: Literal["x-bigquery-partition"] = "x-bigquery-partition" diff --git a/dlt/destinations/impl/synapse/synapse_adapter.py b/dlt/destinations/impl/synapse/synapse_adapter.py index 24932736f9..8b262f3621 100644 --- a/dlt/destinations/impl/synapse/synapse_adapter.py +++ b/dlt/destinations/impl/synapse/synapse_adapter.py @@ -1,7 +1,7 @@ from typing import Any, Literal, Set, get_args, Final, Dict from dlt.extract import DltResource, resource as make_resource -from dlt.extract.typing import TTableHintTemplate +from dlt.extract.items import TTableHintTemplate from dlt.extract.hints import TResourceHints from dlt.destinations.utils import ensure_resource diff --git a/dlt/extract/concurrency.py b/dlt/extract/concurrency.py index 79c518b697..6a330b2645 100644 --- a/dlt/extract/concurrency.py +++ b/dlt/extract/concurrency.py @@ -10,8 +10,7 @@ from dlt.common.exceptions import PipelineException from dlt.common.configuration.container import Container from dlt.common.runtime.signals import sleep -from dlt.extract.typing import DataItemWithMeta, TItemFuture -from dlt.extract.items import ResolvablePipeItem, FuturePipeItem +from dlt.extract.items import DataItemWithMeta, TItemFuture, ResolvablePipeItem, FuturePipeItem from dlt.extract.exceptions import ( DltSourceException, diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index b3174a6eaf..6e916ff6e1 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -62,7 +62,7 @@ ) from dlt.extract.incremental import IncrementalResourceWrapper -from dlt.extract.typing import TTableHintTemplate +from dlt.extract.items import TTableHintTemplate from dlt.extract.source import DltSource from dlt.extract.resource import DltResource, TUnboundDltResource diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index de06c67139..c3a20e72e5 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -1,9 +1,9 @@ -from inspect import Signature, isgenerator +from inspect import Signature, isgenerator, isgeneratorfunction, unwrap from typing import Any, Set, Type from dlt.common.exceptions import DltException from dlt.common.utils import get_callable_name -from dlt.extract.typing import ValidateItem, TDataItems +from dlt.extract.items import ValidateItem, TDataItems class ExtractorException(DltException): @@ -101,6 +101,17 @@ def __init__(self, pipe_name: str, gen: Any) -> None: super().__init__(pipe_name, msg) +class UnclosablePipe(PipeException): + def __init__(self, pipe_name: str, gen: Any) -> None: + type_name = str(type(gen)) + if gen_name := getattr(gen, "__name__", None): + type_name = f"{type_name} ({gen_name})" + msg = f"Pipe with gen of type {type_name} cannot be closed." + if callable(gen) and isgeneratorfunction(unwrap(gen)): + msg += " Closing of partially evaluated transformers is not yet supported." + super().__init__(pipe_name, msg) + + class ResourceNameMissing(DltResourceException): def __init__(self) -> None: super().__init__( diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index f60589c514..84abb4f3a8 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -21,7 +21,7 @@ ) from dlt.extract.hints import HintsMeta from dlt.extract.resource import DltResource -from dlt.extract.typing import TableNameMeta +from dlt.extract.items import TableNameMeta from dlt.extract.storage import ExtractStorage, ExtractorItemStorage try: diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 7f4f54389f..f298e414a1 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -21,7 +21,7 @@ InconsistentTableTemplate, ) from dlt.extract.incremental import Incremental -from dlt.extract.typing import TFunHintTemplate, TTableHintTemplate, ValidateItem +from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 24495ccb19..54e8b3d447 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -6,7 +6,6 @@ import inspect from functools import wraps - import dlt from dlt.common.exceptions import MissingDependencyException from dlt.common import pendulum, logger @@ -15,6 +14,7 @@ TDataItem, TDataItems, TFun, + TSortOrder, extract_inner_type, get_generic_type_argument_from_instance, is_optional_type, @@ -36,7 +36,7 @@ ) from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc from dlt.extract.pipe import Pipe -from dlt.extract.typing import SupportsPipe, TTableHintTemplate, ItemTransform +from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( JsonIncremental, ArrowIncremental, @@ -87,6 +87,9 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa end_value: Optional value used to load a limited range of records between `initial_value` and `end_value`. Use in conjunction with `initial_value`, e.g. load records from given month `incremental(initial_value="2022-01-01T00:00:00Z", end_value="2022-02-01T00:00:00Z")` Note, when this is set the incremental filtering is stateless and `initial_value` always supersedes any previous incremental value in state. + row_order: Declares that data source returns rows in descending (desc) or ascending (asc) order as defined by `last_value_func`. If row order is know, Incremental class + is able to stop requesting new rows by closing pipe generator. This prevents getting more data from the source. Defaults to None, which means that + row order is not known. allow_external_schedulers: If set to True, allows dlt to look for external schedulers from which it will take "initial_value" and "end_value" resulting in loading only specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. @@ -98,6 +101,8 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa # TODO: Support typevar here initial_value: Optional[Any] = None end_value: Optional[Any] = None + row_order: Optional[TSortOrder] = None + allow_external_schedulers: bool = False # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -109,6 +114,7 @@ def __init__( last_value_func: Optional[LastValueFunc[TCursorValue]] = max, primary_key: Optional[TTableHintTemplate[TColumnNames]] = None, end_value: Optional[TCursorValue] = None, + row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, ) -> None: # make sure that path is valid @@ -123,6 +129,7 @@ def __init__( """Value of last_value at the beginning of current pipeline run""" self.resource_name: Optional[str] = None self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key + self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers self._cached_state: IncrementalColumnState = None @@ -135,6 +142,8 @@ def __init__( """Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`""" self._transformers: Dict[str, IncrementalTransform] = {} + self._bound_pipe: SupportsPipe = None + """Bound pipe""" @property def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]: @@ -171,18 +180,6 @@ def from_existing_state( i.resource_name = resource_name return i - def copy(self) -> "Incremental[TCursorValue]": - # preserve Generic param information - constructor = self.__orig_class__ if hasattr(self, "__orig_class__") else self.__class__ - return constructor( # type: ignore - self.cursor_path, - initial_value=self.initial_value, - last_value_func=self.last_value_func, - primary_key=self._primary_key, - end_value=self.end_value, - allow_external_schedulers=self.allow_external_schedulers, - ) - def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue]": """Create a new incremental instance which merges the two instances. Only properties which are not `None` from `other` override the current instance properties. @@ -193,6 +190,7 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue >>> >>> my_resource(updated=incremental(initial_value='2023-01-01', end_value='2023-02-01')) """ + # func, resource name and primary key are not part of the dict kwargs = dict(self, last_value_func=self.last_value_func, primary_key=self._primary_key) for key, value in dict( other, last_value_func=other.last_value_func, primary_key=other.primary_key @@ -207,7 +205,15 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue other.__orig_class__ if hasattr(other, "__orig_class__") else other.__class__ ) constructor = extract_inner_type(constructor) - return constructor(**kwargs) # type: ignore + merged = constructor(**kwargs) + merged.resource_name = self.resource_name + if other.resource_name: + merged.resource_name = other.resource_name + return merged # type: ignore + + def copy(self) -> "Incremental[TCursorValue]": + # merge creates a copy + return self.merge(self) def on_resolved(self) -> None: compile_path(self.cursor_path) @@ -246,7 +252,10 @@ def parse_native_representation(self, native_value: Any) -> None: self.initial_value = native_value.initial_value self.last_value_func = native_value.last_value_func self.end_value = native_value.end_value - self.resource_name = self.resource_name + self.resource_name = native_value.resource_name + self._primary_key = native_value._primary_key + self.allow_external_schedulers = native_value.allow_external_schedulers + self.row_order = native_value.row_order else: # TODO: Maybe check if callable(getattr(native_value, '__lt__', None)) # Passing bare value `incremental=44` gets parsed as initial_value self.initial_value = native_value @@ -304,9 +313,15 @@ def last_value(self) -> Optional[TCursorValue]: def _transform_item( self, transformer: IncrementalTransform, row: TDataItem ) -> Optional[TDataItem]: - row, start_out_of_range, end_out_of_range = transformer(row) - self.start_out_of_range = start_out_of_range - self.end_out_of_range = end_out_of_range + row, self.start_out_of_range, self.end_out_of_range = transformer(row) + # if we know that rows are ordered we can close the generator automatically + # mind that closing pipe will not immediately close processing. it only closes the + # generator so this page will be fully processed + # TODO: we cannot close partially evaluated transformer gen. to implement that + # we'd need to pass the source gen along with each yielded item and close this particular gen + # NOTE: with that implemented we could implement add_limit as a regular transform having access to gen + if self.can_close() and not self._bound_pipe.has_parent: + self._bound_pipe.close() return row def get_incremental_value_type(self) -> Type[Any]: @@ -387,6 +402,7 @@ def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]": if self.is_partial(): raise IncrementalCursorPathMissing(pipe.name, None, None) self.resource_name = pipe.name + self._bound_pipe = pipe # try to join external scheduler if self.allow_external_schedulers: self._join_external_scheduler() @@ -401,6 +417,21 @@ def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]": self._make_transforms() return self + def can_close(self) -> bool: + """Checks if incremental is out of range and can be closed. + + Returns true only when `row_order` was set and + 1. results are ordered ascending and are above upper bound (end_value) + 2. results are ordered descending and are below or equal lower bound (start_value) + """ + # ordered ascending, check if we cross upper bound + return ( + self.row_order == "asc" + and self.end_out_of_range + or self.row_order == "desc" + and self.start_out_of_range + ) + def __str__(self) -> str: return ( f"Incremental at {id(self)} for resource {self.resource_name} with cursor path:" @@ -453,6 +484,7 @@ def __init__(self, primary_key: Optional[TTableHintTemplate[TColumnNames]] = Non self.primary_key = primary_key self.incremental_state: IncrementalColumnState = None self._allow_external_schedulers: bool = None + self._bound_pipe: SupportsPipe = None @staticmethod def should_wrap(sig: inspect.Signature) -> bool: @@ -526,7 +558,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: self._incremental.resolve() # in case of transformers the bind will be called before this wrapper is set: because transformer is called for a first time late in the pipe if self._resource_name: - self._incremental.bind(Pipe(self._resource_name)) + # rebind internal _incremental from wrapper that already holds + # instance of a Pipe + self.bind(None) bound_args.arguments[p.name] = self._incremental return func(*bound_args.args, **bound_args.kwargs) @@ -546,6 +580,9 @@ def allow_external_schedulers(self, value: bool) -> None: self._incremental.allow_external_schedulers = value def bind(self, pipe: SupportsPipe) -> "IncrementalResourceWrapper": + # if pipe is None we are re-binding internal incremental + pipe = pipe or self._bound_pipe + self._bound_pipe = pipe self._resource_name = pipe.name if self._incremental: if self._allow_external_schedulers is not None: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 6e4bcfb04c..e20617cf63 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -13,7 +13,7 @@ ) from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc from dlt.extract.utils import resolve_column_value -from dlt.extract.typing import TTableHintTemplate +from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames try: @@ -109,9 +109,8 @@ def __call__( Returns: Tuple (row, start_out_of_range, end_out_of_range) where row is either the data item or `None` if it is completely filtered out """ - start_out_of_range = end_out_of_range = False if row is None: - return row, start_out_of_range, end_out_of_range + return row, False, False row_value = self.find_cursor_value(row) last_value = self.incremental_state["last_value"] @@ -132,8 +131,7 @@ def __call__( self.last_value_func((row_value, self.end_value)) != self.end_value or self.last_value_func((row_value,)) == self.end_value ): - end_out_of_range = True - return None, start_out_of_range, end_out_of_range + return None, False, True check_values = (row_value,) + ((last_value,) if last_value is not None else ()) new_value = self.last_value_func(check_values) @@ -146,10 +144,10 @@ def __call__( # if unique value exists then use it to deduplicate if unique_value: if unique_value in self.incremental_state["unique_hashes"]: - return None, start_out_of_range, end_out_of_range + return None, False, False # add new hash only if the record row id is same as current last value self.incremental_state["unique_hashes"].append(unique_value) - return row, start_out_of_range, end_out_of_range + return row, False, False # skip the record that is not a last_value or new_value: that record was already processed check_values = (row_value,) + ( (self.start_value,) if self.start_value is not None else () @@ -157,17 +155,16 @@ def __call__( new_value = self.last_value_func(check_values) # Include rows == start_value but exclude "lower" if new_value == self.start_value and processed_row_value != self.start_value: - start_out_of_range = True - return None, start_out_of_range, end_out_of_range + return None, True, False else: - return row, start_out_of_range, end_out_of_range + return row, False, False else: self.incremental_state["last_value"] = new_value unique_value = self.unique_value(row, self.primary_key, self.resource_name) if unique_value: self.incremental_state["unique_hashes"] = [unique_value] - return row, start_out_of_range, end_out_of_range + return row, False, False class ArrowIncremental(IncrementalTransform): diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 43e8fda8d5..c6e1f0a4b8 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,18 +1,45 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Union, Optional +import inspect +from abc import ABC, abstractmethod +from typing import ( + Any, + Callable, + Generic, + Iterator, + Iterable, + Literal, + Optional, + Protocol, + TypeVar, + Union, + Awaitable, + TYPE_CHECKING, + NamedTuple, + Generator, +) from concurrent.futures import Future -from dlt.common.typing import TDataItems -from dlt.extract.typing import TPipedDataItems, DataItemWithMeta, TItemFuture +from dlt.common.typing import TAny, TDataItem, TDataItems + + +TDecompositionStrategy = Literal["none", "scc"] +TDeferredDataItems = Callable[[], TDataItems] +TAwaitableDataItems = Awaitable[TDataItems] +TPipedDataItems = Union[TDataItems, TDeferredDataItems, TAwaitableDataItems] + +TDynHintType = TypeVar("TDynHintType") +TFunHintTemplate = Callable[[TDataItem], TDynHintType] +TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] if TYPE_CHECKING: - from dlt.extract.pipe import Pipe + TItemFuture = Future[TPipedDataItems] +else: + TItemFuture = Future class PipeItem(NamedTuple): item: TDataItems step: int - pipe: "Pipe" + pipe: "SupportsPipe" meta: Any @@ -20,19 +47,185 @@ class ResolvablePipeItem(NamedTuple): # mypy unable to handle recursive types, ResolvablePipeItem should take itself in "item" item: Union[TPipedDataItems, Iterator[TPipedDataItems]] step: int - pipe: "Pipe" + pipe: "SupportsPipe" meta: Any class FuturePipeItem(NamedTuple): item: TItemFuture step: int - pipe: "Pipe" + pipe: "SupportsPipe" meta: Any class SourcePipeItem(NamedTuple): item: Union[Iterator[TPipedDataItems], Iterator[ResolvablePipeItem]] step: int - pipe: "Pipe" + pipe: "SupportsPipe" meta: Any + + +# pipeline step may be iterator of data items or mapping function that returns data item or another iterator +TPipeStep = Union[ + Iterable[TPipedDataItems], + Iterator[TPipedDataItems], + # Callable with meta + Callable[[TDataItems, Optional[Any]], TPipedDataItems], + Callable[[TDataItems, Optional[Any]], Iterator[TPipedDataItems]], + Callable[[TDataItems, Optional[Any]], Iterator[ResolvablePipeItem]], + # Callable without meta + Callable[[TDataItems], TPipedDataItems], + Callable[[TDataItems], Iterator[TPipedDataItems]], + Callable[[TDataItems], Iterator[ResolvablePipeItem]], +] + + +class DataItemWithMeta: + __slots__ = "meta", "data" + + meta: Any + data: TDataItems + + def __init__(self, meta: Any, data: TDataItems) -> None: + self.meta = meta + self.data = data + + +class TableNameMeta: + __slots__ = "table_name" + + table_name: str + + def __init__(self, table_name: str) -> None: + self.table_name = table_name + + +class SupportsPipe(Protocol): + """A protocol with the core Pipe properties and operations""" + + name: str + """Pipe name which is inherited by a resource""" + parent: "SupportsPipe" + """A parent of the current pipe""" + + @property + def gen(self) -> TPipeStep: + """A data generating step""" + ... + + def __getitem__(self, i: int) -> TPipeStep: + """Get pipe step at index""" + ... + + def __len__(self) -> int: + """Length of a pipe""" + ... + + @property + def has_parent(self) -> bool: + """Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources""" + ... + + def close(self) -> None: + """Closes pipe generator""" + ... + + +ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] +ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] +ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] + + +class ItemTransform(ABC, Generic[TAny]): + _f_meta: ItemTransformFunctionWithMeta[TAny] = None + _f: ItemTransformFunctionNoMeta[TAny] = None + + def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: + # inspect the signature + sig = inspect.signature(transform_f) + # TODO: use TypeGuard here to get rid of type ignore + if len(sig.parameters) == 1: + self._f = transform_f # type: ignore + else: # TODO: do better check + self._f_meta = transform_f # type: ignore + + def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": + return self + + @abstractmethod + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" + pass + + +class FilterItem(ItemTransform[bool]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[bool] + _f: ItemTransformFunctionNoMeta[bool] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + if self._f_meta: + item = [i for i in item if self._f_meta(i, meta)] + else: + item = [i for i in item if self._f(i)] + if not item: + # item was fully consumed by the filter + return None + return item + else: + if self._f_meta: + return item if self._f_meta(item, meta) else None + else: + return item if self._f(item) else None + + +class MapItem(ItemTransform[TDataItem]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + if self._f_meta: + return [self._f_meta(i, meta) for i in item] + else: + return [self._f(i) for i in item] + else: + if self._f_meta: + return self._f_meta(item, meta) + else: + return self._f(item) + + +class YieldMapItem(ItemTransform[Iterator[TDataItem]]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + for i in item: + if self._f_meta: + yield from self._f_meta(i, meta) + else: + yield from self._f(i) + else: + if self._f_meta: + yield from self._f_meta(item, meta) + else: + yield from self._f(item) + + +class ValidateItem(ItemTransform[TDataItem]): + """Base class for validators of data items. + + Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. + See `PydanticValidator` for possible implementation. + """ + + table_name: str + + def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: + self.table_name = pipe.name + return self diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index dd1f3e4ec5..6517273db5 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -13,10 +13,13 @@ InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound, PipeNotBoundToData, + UnclosablePipe, ) -from dlt.extract.typing import ( +from dlt.extract.items import ( ItemTransform, + ResolvablePipeItem, SupportsPipe, + TPipeStep, TPipedDataItems, ) from dlt.extract.utils import ( @@ -26,21 +29,6 @@ wrap_resource_gen, wrap_async_iterator, ) -from dlt.extract.items import ResolvablePipeItem - -# pipeline step may be iterator of data items or mapping function that returns data item or another iterator -TPipeStep = Union[ - Iterable[TPipedDataItems], - Iterator[TPipedDataItems], - # Callable with meta - Callable[[TDataItems, Optional[Any]], TPipedDataItems], - Callable[[TDataItems, Optional[Any]], Iterator[TPipedDataItems]], - Callable[[TDataItems, Optional[Any]], Iterator[ResolvablePipeItem]], - # Callable without meta - Callable[[TDataItems], TPipedDataItems], - Callable[[TDataItems], Iterator[TPipedDataItems]], - Callable[[TDataItems], Iterator[ResolvablePipeItem]], -] class ForkPipe: @@ -188,6 +176,15 @@ def replace_gen(self, gen: TPipeStep) -> None: assert not self.is_empty self._steps[self._gen_idx] = gen + def close(self) -> None: + """Closes pipe generator""" + gen = self.gen + # NOTE: async generator are wrapped in generators + if inspect.isgenerator(gen): + gen.close() + else: + raise UnclosablePipe(self.name, gen) + def full_pipe(self) -> "Pipe": """Creates a pipe that from the current and all the parent pipes.""" # prevent creating full pipe with unbound heads diff --git a/dlt/extract/pipe_iterator.py b/dlt/extract/pipe_iterator.py index b23a9ae7e4..145b517802 100644 --- a/dlt/extract/pipe_iterator.py +++ b/dlt/extract/pipe_iterator.py @@ -32,11 +32,9 @@ ResourceExtractionError, ) from dlt.extract.pipe import Pipe -from dlt.extract.typing import DataItemWithMeta +from dlt.extract.items import DataItemWithMeta, PipeItem, ResolvablePipeItem, SourcePipeItem from dlt.extract.utils import wrap_async_iterator from dlt.extract.concurrency import FuturesPool -from dlt.extract.items import PipeItem, ResolvablePipeItem, SourcePipeItem - TPipeNextItemMode = Literal["fifo", "round_robin"] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 7159aff0b1..0fef502112 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -1,5 +1,6 @@ from copy import deepcopy import inspect +from functools import partial from typing import ( AsyncIterable, AsyncIterator, @@ -26,7 +27,7 @@ from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id from dlt.extract.utils import wrap_async_iterator, wrap_parallel_iterator -from dlt.extract.typing import ( +from dlt.extract.items import ( DataItemWithMeta, ItemTransformFunc, ItemTransformFunctionWithMeta, @@ -326,8 +327,8 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: count = 0 is_async_gen = False - if inspect.isfunction(gen): - gen = gen() + if callable(gen): + gen = gen() # type: ignore # wrap async gen already here if isinstance(gen, AsyncIterator): @@ -351,7 +352,13 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: # transformers should be limited by their input, so we only limit non-transformers if not self.is_transformer: - self._pipe.replace_gen(_gen_wrap(self._pipe.gen)) + gen = self._pipe.gen + # wrap gen directly + if inspect.isgenerator(gen): + self._pipe.replace_gen(_gen_wrap(gen)) + else: + # keep function as function to not evaluate generators before pipe starts + self._pipe.replace_gen(partial(_gen_wrap, gen)) return self def parallelize(self) -> "DltResource": diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 53d7d6fa8c..5d9799e29c 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -23,7 +23,7 @@ ) from dlt.common.utils import graph_find_scc_nodes, flatten_list_or_items, graph_edges_to_nodes -from dlt.extract.typing import TDecompositionStrategy +from dlt.extract.items import TDecompositionStrategy from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe from dlt.extract.hints import DltResourceHints, make_hints diff --git a/dlt/extract/typing.py b/dlt/extract/typing.py deleted file mode 100644 index eadbd449b9..0000000000 --- a/dlt/extract/typing.py +++ /dev/null @@ -1,169 +0,0 @@ -import inspect -from abc import ABC, abstractmethod -from typing import ( - Any, - Callable, - Generic, - Iterator, - Literal, - Optional, - Protocol, - TypeVar, - Union, - Awaitable, - TYPE_CHECKING, - Generator, -) -from concurrent.futures import Future - -from dlt.common.typing import TAny, TDataItem, TDataItems - - -TDecompositionStrategy = Literal["none", "scc"] -TDeferredDataItems = Callable[[], TDataItems] -TAwaitableDataItems = Awaitable[TDataItems] -TPipedDataItems = Union[TDataItems, TDeferredDataItems, TAwaitableDataItems] - -TDynHintType = TypeVar("TDynHintType") -TFunHintTemplate = Callable[[TDataItem], TDynHintType] -TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] - - -class DataItemWithMeta: - __slots__ = "meta", "data" - - meta: Any - data: TDataItems - - def __init__(self, meta: Any, data: TDataItems) -> None: - self.meta = meta - self.data = data - - -class TableNameMeta: - __slots__ = "table_name" - - table_name: str - - def __init__(self, table_name: str) -> None: - self.table_name = table_name - - -class SupportsPipe(Protocol): - """A protocol with the core Pipe properties and operations""" - - name: str - """Pipe name which is inherited by a resource""" - parent: "SupportsPipe" - """A parent of the current pipe""" - - @property - def has_parent(self) -> bool: - """Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources""" - ... - - -ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] -ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] -ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] - - -class ItemTransform(ABC, Generic[TAny]): - _f_meta: ItemTransformFunctionWithMeta[TAny] = None - _f: ItemTransformFunctionNoMeta[TAny] = None - - def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: - # inspect the signature - sig = inspect.signature(transform_f) - # TODO: use TypeGuard here to get rid of type ignore - if len(sig.parameters) == 1: - self._f = transform_f # type: ignore - else: # TODO: do better check - self._f_meta = transform_f # type: ignore - - def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": - return self - - @abstractmethod - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" - pass - - -class FilterItem(ItemTransform[bool]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[bool] - _f: ItemTransformFunctionNoMeta[bool] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - if self._f_meta: - item = [i for i in item if self._f_meta(i, meta)] - else: - item = [i for i in item if self._f(i)] - if not item: - # item was fully consumed by the filter - return None - return item - else: - if self._f_meta: - return item if self._f_meta(item, meta) else None - else: - return item if self._f(item) else None - - -class MapItem(ItemTransform[TDataItem]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - if self._f_meta: - return [self._f_meta(i, meta) for i in item] - else: - return [self._f(i) for i in item] - else: - if self._f_meta: - return self._f_meta(item, meta) - else: - return self._f(item) - - -class YieldMapItem(ItemTransform[Iterator[TDataItem]]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - for i in item: - if self._f_meta: - yield from self._f_meta(i, meta) - else: - yield from self._f(i) - else: - if self._f_meta: - yield from self._f_meta(item, meta) - else: - yield from self._f(item) - - -class ValidateItem(ItemTransform[TDataItem]): - """Base class for validators of data items. - - Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. - See `PydanticValidator` for possible implementation. - """ - - table_name: str - - def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: - self.table_name = pipe.name - return self - - -if TYPE_CHECKING: - TItemFuture = Future[Union[TDataItems, DataItemWithMeta]] -else: - TItemFuture = Future diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index a5c22dad7a..69edcab93d 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -2,6 +2,7 @@ import makefun import asyncio from typing import ( + Callable, Optional, Tuple, Union, @@ -28,7 +29,7 @@ InvalidResourceDataTypeFunctionNotAGenerator, InvalidStepFunctionArguments, ) -from dlt.extract.typing import ( +from dlt.extract.items import ( TTableHintTemplate, TDataItem, TFunHintTemplate, @@ -181,16 +182,10 @@ async def run() -> TDataItems: def wrap_parallel_iterator(f: TAnyFunOrGenerator) -> TAnyFunOrGenerator: """Wraps a generator for parallel extraction""" - def _wrapper(*args: Any, **kwargs: Any) -> Generator[TDataItems, None, None]: - is_generator = True + def _gen_wrapper(*args: Any, **kwargs: Any) -> Iterator[TDataItems]: gen: TAnyFunOrGenerator if callable(f): - if inspect.isgeneratorfunction(inspect.unwrap(f)): - gen = f(*args, **kwargs) - else: - # Function is a transformer that returns values - is_generator = False - gen = f + gen = f(*args, **kwargs) else: gen = f @@ -201,10 +196,7 @@ def _parallel_gen() -> TDataItems: nonlocal busy nonlocal exhausted try: - if is_generator: - return next(gen) # type: ignore[call-overload] - else: - return gen(*args, **kwargs) # type: ignore[operator] + return next(gen) # type: ignore[call-overload] except StopIteration: exhausted = True return None @@ -216,17 +208,24 @@ def _parallel_gen() -> TDataItems: while busy: yield None busy = True - if not is_generator: # Regular function is only resolved once - exhausted = True yield _parallel_gen except GeneratorExit: - if is_generator: - gen.close() # type: ignore[attr-defined] + gen.close() # type: ignore[attr-defined] raise if callable(f): - return wraps(f)(_wrapper) # type: ignore[arg-type] - return _wrapper() + if inspect.isgeneratorfunction(inspect.unwrap(f)): + return wraps(f)(_gen_wrapper) # type: ignore[arg-type] + else: + + def _fun_wrapper(*args: Any, **kwargs: Any) -> Any: + def _curry() -> Any: + return f(*args, **kwargs) + + return _curry + + return wraps(f)(_fun_wrapper) # type: ignore[arg-type] + return _gen_wrapper() # type: ignore[return-value] def wrap_compat_transformer( diff --git a/dlt/extract/validation.py b/dlt/extract/validation.py index 72b70c5661..504eee1bfc 100644 --- a/dlt/extract/validation.py +++ b/dlt/extract/validation.py @@ -8,7 +8,7 @@ from dlt.common.typing import TDataItems from dlt.common.schema.typing import TAnySchemaColumns, TSchemaContract, TSchemaEvolutionMode -from dlt.extract.typing import TTableHintTemplate, ValidateItem +from dlt.extract.items import TTableHintTemplate, ValidateItem _TPydanticModel = TypeVar("_TPydanticModel", bound=PydanticBaseModel) diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index c573eaf5cb..9a6616e9ea 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -48,28 +48,6 @@ ) -def task_name(pipeline: Pipeline, data: Any) -> str: - """Generate a task name. - - Args: - pipeline (Pipeline): The pipeline to run. - data (Any): The data to run the pipeline with. - - Returns: - str: The name of the task. - """ - task_name = pipeline.pipeline_name - - if isinstance(data, DltSource): - resource_names = list(data.selected_resources.keys()) - task_name = data.name + "_" + "-".join(resource_names[:4]) - - if len(resource_names) > 4: - task_name += f"-{len(resource_names)-4}-more" - - return task_name - - class PipelineTasksGroup(TaskGroup): """ Represents a dlt Airflow pipeline task group. @@ -197,7 +175,7 @@ def run( schema_contract=schema_contract, pipeline_name=pipeline_name, ) - return PythonOperator(task_id=task_name(pipeline, data), python_callable=f, **kwargs) + return PythonOperator(task_id=_task_name(pipeline, data), python_callable=f, **kwargs) def _run( self, @@ -385,7 +363,7 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator pipeline_name=name, ) return PythonOperator( - task_id=task_name(pipeline, data), python_callable=f, **kwargs + task_id=_task_name(pipeline, data), python_callable=f, **kwargs ) if decompose == "none": @@ -415,7 +393,7 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator tasks = [] sources = data.decompose("scc") - t_name = task_name(pipeline, data) + t_name = _task_name(pipeline, data) start = make_task(pipeline, sources[0]) # parallel tasks @@ -456,16 +434,16 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator start = make_task( pipeline, sources[0], - naming.normalize_identifier(task_name(pipeline, sources[0])), + naming.normalize_identifier(_task_name(pipeline, sources[0])), ) # parallel tasks for source in sources[1:]: # name pipeline the same as task - new_pipeline_name = naming.normalize_identifier(task_name(pipeline, source)) + new_pipeline_name = naming.normalize_identifier(_task_name(pipeline, source)) tasks.append(make_task(pipeline, source, new_pipeline_name)) - t_name = task_name(pipeline, data) + t_name = _task_name(pipeline, data) end = DummyOperator(task_id=f"{t_name}_end") if tasks: @@ -480,10 +458,6 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator " 'parallel-isolated']" ) - def add_fun(self, f: Callable[..., Any], **kwargs: Any) -> Any: - """Will execute a function `f` inside an Airflow task. It is up to the function to create pipeline and source(s)""" - raise NotImplementedError() - def airflow_get_execution_dates() -> Tuple[pendulum.DateTime, Optional[pendulum.DateTime]]: # prefer logging to task logger @@ -494,3 +468,25 @@ def airflow_get_execution_dates() -> Tuple[pendulum.DateTime, Optional[pendulum. return context["data_interval_start"], context["data_interval_end"] except Exception: return None, None + + +def _task_name(pipeline: Pipeline, data: Any) -> str: + """Generate a task name. + + Args: + pipeline (Pipeline): The pipeline to run. + data (Any): The data to run the pipeline with. + + Returns: + str: The name of the task. + """ + task_name = pipeline.pipeline_name + + if isinstance(data, DltSource): + resource_names = list(data.selected_resources.keys()) + task_name = data.name + "_" + "-".join(resource_names[:4]) + + if len(resource_names) > 4: + task_name += f"-{len(resource_names)-4}-more" + + return task_name diff --git a/dlt/sources/helpers/transform.py b/dlt/sources/helpers/transform.py index 1975c20586..3949823be7 100644 --- a/dlt/sources/helpers/transform.py +++ b/dlt/sources/helpers/transform.py @@ -1,5 +1,5 @@ from dlt.common.typing import TDataItem -from dlt.extract.typing import ItemTransformFunctionNoMeta +from dlt.extract.items import ItemTransformFunctionNoMeta def take_first(max_items: int) -> ItemTransformFunctionNoMeta[bool]: diff --git a/docs/examples/transformers/pokemon.py b/docs/examples/transformers/pokemon.py index 97b9a98b11..ca32c570ef 100644 --- a/docs/examples/transformers/pokemon.py +++ b/docs/examples/transformers/pokemon.py @@ -29,15 +29,13 @@ def _get_pokemon(_pokemon): # a special case where just one item is retrieved in transformer # a whole transformer may be marked for parallel execution - @dlt.transformer - @dlt.defer + @dlt.transformer(parallelized=True) def species(pokemon_details): """Yields species details for a pokemon""" species_data = requests.get(pokemon_details["species"]["url"]).json() # link back to pokemon so we have a relation in loaded data species_data["pokemon_id"] = pokemon_details["id"] - # just return the results, if you yield, - # generator will be evaluated in main thread + # You can return the result instead of yield since the transformer only generates one result return species_data # create two simple pipelines with | operator diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 3f0532e9d2..67965863ce 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -215,6 +215,10 @@ def sql_database( schema: Optional[str] = dlt.config.value, metadata: Optional[MetaData] = None, table_names: Optional[List[str]] = dlt.config.value, + chunk_size: int = 1000, + detect_precision_hints: Optional[bool] = dlt.config.value, + defer_table_reflect: Optional[bool] = dlt.config.value, + table_adapter_callback: Callable[[Table], None] = None, ) -> Iterable[DltResource]: ``` @@ -226,6 +230,16 @@ def sql_database( `table_names`: List of tables to load; defaults to all if not provided. +`chunk_size`: Number of records in a batch. Internally SqlAlchemy maintains a buffer twice that size + +`detect_precision_hints`: Infers full schema for columns including data type, precision and scale + +`defer_table_reflect`: Will connect to the source database and reflect the tables +only at runtime. Use when running on Airflow + +`table_adapter_callback`: A callback with SQLAlchemy `Table` where you can, for example, +remove certain columns to be selected. + ### Resource `sql_table` This function loads data from specific database tables. @@ -240,21 +254,18 @@ def sql_table( schema: Optional[str] = dlt.config.value, metadata: Optional[MetaData] = None, incremental: Optional[dlt.sources.incremental[Any]] = None, + chunk_size: int = 1000, + detect_precision_hints: Optional[bool] = dlt.config.value, + defer_table_reflect: Optional[bool] = dlt.config.value, + table_adapter_callback: Callable[[Table], None] = None, ) -> DltResource: ``` - -`credentials`: Database info or an Engine instance. - -`table`: Table to load, set in code or default from "config.toml". - -`schema`: Optional name of the table schema. - -`metadata`: Optional SQLAlchemy.MetaData; takes precedence over schema. - `incremental`: Optional, enables incremental loading. `write_disposition`: Can be "merge", "replace", or "append". +for other arguments, see `sql_database` source above. + ## Incremental Loading Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. @@ -264,15 +275,13 @@ Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing I ### Configuring Incremental Loading 1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. 1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. -1. **Apply Incremental Configuration**: Enable incremental loading with your configuration's `incremental` argument. 1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. - -:::note -Incorporating incremental loading into your SQL data pipelines can significantly enhance performance by minimizing unnecessary data processing and transfer. -::: +1. **Set end_value for backfill**: Set `end_value` if you want to backfill data from +certain range. +1. **Order returned rows**. Set `row_order` to `asc` or `desc` to order returned rows. #### Incremental Loading Example -1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an +1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an initial value, the loader generates a SQL query filtering rows with `last_modified` values greater than the specified initial value. ```python @@ -288,8 +297,8 @@ Incorporating incremental loading into your SQL data pipelines can significantly ) ) - info = pipeline.extract(table, write_disposition="merge") - print(info) + info = pipeline.extract(table, write_disposition="merge") + print(info) ``` 1. To incrementally load the "family" table using the sql_database source method: @@ -325,6 +334,20 @@ Incorporating incremental loading into your SQL data pipelines can significantly * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources. ::: +### Run on Airflow +When running on Airflow +1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. +2. Reflect tables at runtime with `defer_table_reflect` argument. +3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). + +### Parallel extraction +You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. +```python +database = sql_database().parallelize() +table = sql_table().parallelize() +``` + + ### Troubleshooting If you encounter issues where the expected WHERE clause for incremental loading is not generated, ensure your configuration aligns with the `sql_table` resource rather than applying hints post-resource creation. This ensures the loader generates the correct query for incremental loading. diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index ea747288e5..d80558e745 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -27,7 +27,7 @@ We'll learn how to: ### Init chess source -```python +```py import threading from typing import Any, Iterator diff --git a/docs/website/docs/examples/transformers/index.md b/docs/website/docs/examples/transformers/index.md index 056ba429b4..2f5c3dd532 100644 --- a/docs/website/docs/examples/transformers/index.md +++ b/docs/website/docs/examples/transformers/index.md @@ -29,7 +29,7 @@ We'll learn how to: ### Loading code -```python +```py import dlt from dlt.sources.helpers import requests diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 7e4021214e..dd52c9c750 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -192,7 +192,7 @@ def resource(): yield [ {"id": 2, "val": "foo", "lsn": 1, "deleted_flag": False}, {"id": 2, "lsn": 2, "deleted_flag": True} - ] + ] ... ``` @@ -267,7 +267,7 @@ In essence, `dlt.sources.incremental` instance above * **updated_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor * **updated_at.start_value** a maximum `updated_at` value from the previous run or the **initial_value** on first run * **updated_at.last_value** a "real time" `updated_at` value updated with each yielded item or page. before first yield it equals **start_value** -* **updated_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill) +* **updated_at.end_value** (here not used) [marking end of backfill range](#using-end_value-for-backfill) When paginating you probably need **start_value** which does not change during the execution of the resource, however most paginators will return a **next page** link which you should use. @@ -284,31 +284,21 @@ duplicates and past issues. # use naming function in table name to generate separate tables for each event @dlt.resource(primary_key="id", table_name=lambda i: i['type']) # type: ignore def repo_events( - last_created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", last_value_func=max) + last_created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", last_value_func=max), row_order="desc" ) -> Iterator[TDataItems]: repos_path = "/repos/%s/%s/events" % (urllib.parse.quote(owner), urllib.parse.quote(name)) for page in _get_rest_pages(access_token, repos_path + "?per_page=100"): yield page - - # ---> part below is an optional optimization - # Stop requesting more pages when we encounter an element that - # is older than the incremental value at the beginning of the run. - # The start_out_of_range boolean flag is set in this case - if last_created_at.start_out_of_range: - break ``` We just yield all the events and `dlt` does the filtering (using `id` column declared as `primary_key`). -As an optimization we stop requesting more pages once the incremental value is out of range, -in this case that means we got an element which has a smaller `created_at` than the the `last_created_at.start_value`. -The `start_out_of_range` boolean flag is set when the first such element is yielded from the resource, and -since we know that github returns results ordered from newest to oldest, we know that all subsequent -items will be filtered out anyway and there's no need to fetch more data. + +Github returns events ordered from newest to oldest so we declare the `rows_order` as **descending** to [stop requesting more pages once the incremental value is out of range](#declare-row-order-to-not-request-unnecessary-data). We stop requesting more data from the API after finding first event with `created_at` earlier than `initial_value`. ### max, min or custom `last_value_func` -`dlt.sources.incremental` allows to choose a function that orders (compares) values coming from the items to current `last_value`. +`dlt.sources.incremental` allows to choose a function that orders (compares) cursor values to current `last_value`. * The default function is built-in `max` which returns bigger value of the two * Another built-in `min` returns smaller value. @@ -341,9 +331,134 @@ def get_events(last_created_at = dlt.sources.incremental("$", last_value_func=by yield json.load(f) ``` +### Using `end_value` for backfill +You can specify both initial and end dates when defining incremental loading. Let's go back to our Github example: +```python +@dlt.resource(primary_key="id") +def repo_issues( + access_token, + repository, + created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", end_value="2022-07-01T00:00:00Z") +): + # get issues from created from last "created_at" value + for page in _get_issues_page(access_token, repository, since=created_at.start_value, until=created_at.end_value): + yield page +``` +Above we use `initial_value` and `end_value` arguments of the `incremental` to define the range of issues that we want to retrieve +and pass this range to the Github API (`since` and `until`). As in the examples above, `dlt` will make sure that only the issues from +defined range are returned. + +Please note that when `end_date` is specified, `dlt` **will not modify the existing incremental state**. The backfill is **stateless** and: +1. You can run backfill and incremental load in parallel (ie. in Airflow DAG) in a single pipeline. +2. You can partition your backfill into several smaller chunks and run them in parallel as well. + +To define specific ranges to load, you can simply override the incremental argument in the resource, for example: + +```python +july_issues = repo_issues( + created_at=dlt.sources.incremental( + initial_value='2022-07-01T00:00:00Z', end_value='2022-08-01T00:00:00Z' + ) +) +august_issues = repo_issues( + created_at=dlt.sources.incremental( + initial_value='2022-08-01T00:00:00Z', end_value='2022-09-01T00:00:00Z' + ) +) +... +``` + +Note that `dlt`'s incremental filtering considers the ranges half closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. + + +### Declare row order to not request unnecessary data +With `row_order` argument set, `dlt` will stop getting data from the data source (ie. Github API) if it detect that values of cursor field are out of range of **start** and **end** values. + +In particular: +* `dlt` stops processing when the resource yields any item with an _equal or greater_ cursor value than the `end_value` and `row_order` is set to **asc**. (`end_value` is not included) +* `dlt` stops processing when the resource yields any item with a _lower_ cursor value than the `last_value` and `row_order` is set to **desc**. (`last_value` is included) + +:::note +"higher" and "lower" here refers to when the default `last_value_func` is used (`max()`), +when using `min()` "higher" and "lower" are inverted. +::: + +:::caution +If you use `row_order`, **make sure that the data source returns ordered records** (ascending / descending) on the cursor field, +e.g. if an API returns results both higher and lower +than the given `end_value` in no particular order, data reading stops and you'll miss the data items that were out of order. +::: + +Row order is the most useful when: + +1. The data source does **not** offer start/end filtering of results (e.g. there is no `start_time/end_time` query parameter or similar) +2. The source returns results **ordered by the cursor field** + +The github events example is exactly such case. The results are ordered on cursor value descending but there's no way to tell API to limit returned items to those created before certain date. Without the `row_order` setting, we'd be getting all events, each time we extract the `github_events` resource. + +In the same fashion the `row_order` can be used to **optimize backfill** so we don't continue +making unnecessary API requests after the end of range is reached. For example: + +```python +@dlt.resource(primary_key="id") +def tickets( + zendesk_client, + updated_at=dlt.sources.incremental( + "updated_at", + initial_value="2023-01-01T00:00:00Z", + end_value="2023-02-01T00:00:00Z", + row_order="asc" + ), +): + for page in zendesk_client.get_pages( + "/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value + ): + yield page +``` + +In this example we're loading tickets from Zendesk. The Zendesk API yields items paginated and ordered by oldest to newest, +but only offers a `start_time` parameter for filtering so we cannot tell it to +stop getting data at `end_value`. Instead we set `row_order` to `asc` and `dlt` wil stop +getting more pages from API after first page with cursor value `updated_at` is found older +than `end_value`. + +:::caution +In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close +generator associated with a row that is out of range. You can still use still call `can_close()` method on +incremental and exit yield loop when true. +::: + +:::tip +The `dlt.sources.incremental` instance provides `start_out_of_range` and `end_out_of_range` +attributes which are set when the resource yields an element with a higher/lower cursor value than the +initial or end values. If you do not want `dlt` to stop processing automatically and instead to handle such events yourself, do not specify `row_order`: +```python +@dlt.transformer(primary_key="id") +def tickets( + zendesk_client, + updated_at=dlt.sources.incremental( + "updated_at", + initial_value="2023-01-01T00:00:00Z", + end_value="2023-02-01T00:00:00Z", + row_order="asc" + ), +): + for page in zendesk_client.get_pages( + "/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value + ): + yield page + # Stop loading when we reach the end value + if updated_at.end_out_of_range: + return + +``` +::: + ### Deduplication primary_key -`dlt.sources.incremental` let's you optionally set a `primary_key` that is used exclusively to +`dlt.sources.incremental` will inherit the primary key that is set on the resource. + + let's you optionally set a `primary_key` that is used exclusively to deduplicate and which does not become a table hint. The same setting lets you disable the deduplication altogether when empty tuple is passed. Below we pass `primary_key` directly to `incremental` to disable deduplication. That overrides `delta` primary_key set in the resource: @@ -395,45 +510,6 @@ is created. That prevents `dlt` from controlling the **created** argument during result in `IncrementalUnboundError` exception. ::: -### Using `dlt.sources.incremental` for backfill -You can specify both initial and end dates when defining incremental loading. Let's go back to our Github example: -```python -@dlt.resource(primary_key="id") -def repo_issues( - access_token, - repository, - created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", end_value="2022-07-01T00:00:00Z") -): - # get issues from created from last "created_at" value - for page in _get_issues_page(access_token, repository, since=created_at.start_value, until=created_at.end_value): - yield page -``` -Above we use `initial_value` and `end_value` arguments of the `incremental` to define the range of issues that we want to retrieve -and pass this range to the Github API (`since` and `until`). As in the examples above, `dlt` will make sure that only the issues from -defined range are returned. - -Please note that when `end_date` is specified, `dlt` **will not modify the existing incremental state**. The backfill is **stateless** and: -1. You can run backfill and incremental load in parallel (ie. in Airflow DAG) in a single pipeline. -2. You can partition your backfill into several smaller chunks and run them in parallel as well. - -To define specific ranges to load, you can simply override the incremental argument in the resource, for example: - -```python -july_issues = repo_issues( - created_at=dlt.sources.incremental( - initial_value='2022-07-01T00:00:00Z', end_value='2022-08-01T00:00:00Z' - ) -) -august_issues = repo_issues( - created_at=dlt.sources.incremental( - initial_value='2022-08-01T00:00:00Z', end_value='2022-09-01T00:00:00Z' - ) -) -... -``` - -Note that `dlt`'s incremental filtering considers the ranges half closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. - ### Using Airflow schedule for backfill and incremental loading When [running in Airflow task](../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file), you can opt-in your resource to get the `initial_value`/`start_value` and `end_value` from Airflow schedule associated with your DAG. Let's assume that **Zendesk tickets** resource contains a year of data with thousands of tickets. We want to backfill the last year of data week by week and then continue incremental loading daily. ```python @@ -527,59 +603,6 @@ Before `dlt` starts executing incremental resources, it looks for `data_interval You can run DAGs manually but you must remember to specify the Airflow logical date of the run in the past (use Run with config option). For such run `dlt` will load all data from that past date until now. If you do not specify the past date, a run with a range (now, now) will happen yielding no data. -### Using `start/end_out_of_range` flags with incremental resources - -The `dlt.sources.incremental` instance provides `start_out_of_range` and `end_out_of_range` -attributes which are set when the resource yields an element with a higher/lower cursor value than the -initial or end values. -This makes it convenient to optimize resources in some cases. - -* `start_out_of_range` is `True` when the resource yields any item with a _lower_ cursor value than the `initial_value` -* `end_out_of_range` is `True` when the resource yields any item with an equal or _higher_ cursor value than the `end_value` - -**Note**: "higher" and "lower" here refers to when the default `last_value_func` is used (`max()`), -when using `min()` "higher" and "lower" are inverted. - -You can use these flags when both: - -1. The source does **not** offer start/end filtering of results (e.g. there is no `start_time/end_time` query parameter or similar) -2. The source returns results **ordered by the cursor field** - -:::caution -If you use those flags, **make sure that the data source returns record ordered** (ascending / descending) on the cursor field, -e.g. if an API returns results both higher and lower -than the given `end_value` in no particular order, the `end_out_of_range` flag can be `True` but you'll still want to keep loading. -::: - -The github events example above demonstrates how to use `start_out_of_range` as a stop condition. -This approach works in any case where the API returns items in descending order and we're incrementally loading newer data. - -In the same fashion the `end_out_of_range` filter can be used to optimize backfill so we don't continue -making unnecessary API requests after the end of range is reached. For example: - -```python -@dlt.resource(primary_key="id") -def tickets( - zendesk_client, - updated_at=dlt.sources.incremental( - "updated_at", - initial_value="2023-01-01T00:00:00Z", - end_value="2023-02-01T00:00:00Z", - ), -): - for page in zendesk_client.get_pages( - "/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value - ): - yield page - - # Optimization: Stop loading when we reach the end value - if updated_at.end_out_of_range: - return -``` - -In this example we're loading tickets from Zendesk. The Zendesk API yields items paginated and ordered by oldest to newest, -but only offers a `start_time` parameter for filtering. The incremental `end_out_of_range` flag is set on the first item which -has a timestamp equal or higher than `end_value`. All subsequent items get filtered out so there's no need to request more data. ## Doing a full refresh diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 27b896ba2a..6df0dad82d 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -141,7 +141,7 @@ from sqlalchemy import create_engine # Use any SQL database supported by SQLAlchemy, below we use a public # MySQL instance to get data. -# NOTE: you'll need to install pymysql with `pip install sqlalchemy pymysql` +# NOTE: you'll need to install pymysql with `pip install pymysql` # NOTE: loading data from public mysql instance may take several seconds engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index 42b042324b..7c095b53d4 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -12,7 +12,7 @@ If you can, yield pages when producing data. This makes some processes more effe the necessary function calls (each chunk of data that you yield goes through the extract pipeline once so if you yield a chunk of 10.000 items you will gain significant savings) For example: -```python +```py import dlt def get_rows(limit): @@ -27,7 +27,7 @@ def database_cursor(): can be replaced with: -```python +```py from itertools import islice @dlt.resource @@ -149,7 +149,7 @@ Consider an example source which consists of 2 resources fetching pages of items The `parallelized=True` argument wraps the resources in a generator that yields callables to evaluate each generator step. These callables are executed in the thread pool. Transformer that are not generators (as shown in the example) are internally wrapped in a generator that yields once. -```python +```py import dlt import time from threading import currentThread @@ -222,7 +222,7 @@ workers=4 The example below does the same but using an async generator as the main resource and async/await and futures pool for the transformer. The `parallelized` flag is not supported or needed for async generators, these are wrapped and evaluated concurrently by default: -```python +```py import asyncio @dlt.resource @@ -350,7 +350,7 @@ workers=11 -```pyhon +```py import os import dlt from itertools import islice @@ -424,7 +424,7 @@ You can run several pipeline instances in parallel from a single process by plac separate threads. The most straightforward way is to use `ThreadPoolExecutor` and `asyncio` to execute pipeline methods. -```python +```py import asyncio import dlt from time import sleep @@ -468,9 +468,9 @@ async def _run_async(): asyncio.run(_run_async()) # activate pipelines before they are used pipeline_1.activate() -# assert load_data_table_counts(pipeline_1) == {"async_table": 10} +assert pipeline_1.last_trace.last_normalize_info.row_counts["async_table"] == 10 pipeline_2.activate() -# assert load_data_table_counts(pipeline_2) == {"defer_table": 5} +assert pipeline_2.last_trace.last_normalize_info.row_counts["defer_table"] == 5 ``` diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 4a123ae72c..3fff3bf2ea 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -6,13 +6,17 @@ from dlt.common.exceptions import DictValidationException from dlt.common.schema.typing import TStoredSchema, TColumnSchema from dlt.common.schema.utils import simple_regex_validator -from dlt.common.typing import DictStrStr, StrStr +from dlt.common.typing import DictStrStr, StrStr, TDataItem from dlt.common.validation import validate_dict, validate_dict_ignoring_xkeys -from dlt.extract.typing import TDataItem, TTableHintTemplate TLiteral = Literal["uno", "dos", "tres"] +# some typevars for testing +TDynHintType = TypeVar("TDynHintType") +TFunHintTemplate = Callable[[TDataItem], TDynHintType] +TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] + class TDict(TypedDict): field: TLiteral @@ -253,14 +257,14 @@ class TTestRecordNoName(TypedDict): except AttributeError: pytest.fail("validate_dict raised AttributeError unexpectedly") + test_item_2 = {"name": True} + with pytest.raises(DictValidationException): + validate_dict(TTestRecordNoName, test_item_2, path=".") -def test_callable() -> None: - TDynHintType = TypeVar("TDynHintType") - TFunHintTemplate = Callable[[TDataItem], TDynHintType] - t_table_hint_template = Union[TDynHintType, TFunHintTemplate[TDynHintType]] +def test_callable() -> None: class TTestRecordCallable(TypedDict): - prop: t_table_hint_template # type: ignore + prop: TTableHintTemplate # type: ignore def f(item: Union[TDataItem, TDynHintType]) -> TDynHintType: return item diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 2361f743be..03f87db923 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -40,7 +40,7 @@ CurrentSourceSchemaNotAvailable, InvalidParallelResourceDataType, ) -from dlt.extract.typing import TableNameMeta +from dlt.extract.items import TableNameMeta from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V9 @@ -914,7 +914,7 @@ def test_class_resource() -> None: def test_parallelized_resource_decorator() -> None: - """Test paralellized resources are wrapped correctly. + """Test parallelized resources are wrapped correctly. Note: tests for parallel execution are in test_resource_evaluation """ @@ -928,14 +928,17 @@ def some_gen(): gen = resource._pipe.gen() # type: ignore result = next(gen) # type: ignore[arg-type] assert result() == 1 + assert list(resource) == [1, 2, 3] # Same but wrapping generator directly resource = dlt.resource(some_gen(), parallelized=True) result = next(resource._pipe.gen) # type: ignore assert result() == 1 + # get remaining items + assert list(resource) == [2, 3] - # Wrap a transformer + # Wrap a yielding transformer def some_tx(item): yield item + 1 @@ -946,6 +949,18 @@ def some_tx(item): # Calling transformer returns the parallel wrapper generator inner = pipe_gen(1) # type: ignore assert next(inner)() == 2 # type: ignore + assert list(transformer) == [2, 3, 4] # add 1 to resource + + # Wrap a transformer function + def some_tx_func(item): + return list(range(item)) + + transformer = dlt.transformer(some_tx_func, parallelized=True, data_from=resource) + pipe_gen = transformer._pipe.gen + inner = pipe_gen(3) # type: ignore + # this is a regular function returning list + assert inner() == [0, 1, 2] # type: ignore[operator] + assert list(transformer) == [0, 0, 1, 0, 1, 2] # Invalid parallel resources diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 577dca088c..68c1c82124 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -9,8 +9,8 @@ import dlt from dlt.common import sleep from dlt.common.typing import TDataItems -from dlt.extract.exceptions import CreatePipeException, ResourceExtractionError -from dlt.extract.typing import DataItemWithMeta, FilterItem, MapItem, YieldMapItem +from dlt.extract.exceptions import CreatePipeException, ResourceExtractionError, UnclosablePipe +from dlt.extract.items import DataItemWithMeta, FilterItem, MapItem, YieldMapItem from dlt.extract.pipe import Pipe from dlt.extract.pipe_iterator import PipeIterator, ManagedPipeIterator, PipeItem @@ -614,6 +614,21 @@ def pass_gen(item, meta): _f_items(list(PipeIterator.from_pipes(pipes))) +def test_explicit_close_pipe() -> None: + list_pipe = Pipe.from_data("list_pipe", iter([1, 2, 3])) + with pytest.raises(UnclosablePipe): + list_pipe.close() + + # generator function cannot be closed + genfun_pipe = Pipe.from_data("genfun_pipe", lambda _: (yield from [1, 2, 3])) + with pytest.raises(UnclosablePipe): + genfun_pipe.close() + + gen_pipe = Pipe.from_data("gen_pipe", (lambda: (yield from [1, 2, 3]))()) + gen_pipe.close() + assert inspect.getgeneratorstate(gen_pipe.gen) == "GEN_CLOSED" # type: ignore[arg-type] + + close_pipe_got_exit = False close_pipe_yielding = False diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 6228efea03..7956c83947 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -1,8 +1,10 @@ import os +import asyncio from time import sleep from typing import Optional, Any +from unittest import mock from datetime import datetime # noqa: I251 -from itertools import chain +from itertools import chain, count import duckdb import pytest @@ -27,7 +29,12 @@ from dlt.pipeline.exceptions import PipelineStepFailed from tests.extract.utils import AssertItems, data_item_to_list -from tests.utils import data_to_item_format, TDataItemFormat, ALL_DATA_ITEM_FORMATS +from tests.utils import ( + data_item_length, + data_to_item_format, + TDataItemFormat, + ALL_DATA_ITEM_FORMATS, +) @pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) @@ -1333,6 +1340,119 @@ def ascending_single_item( pipeline.extract(ascending_single_item()) +@pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) +def test_async_row_order_out_of_range(item_type: TDataItemFormat) -> None: + @dlt.resource + async def descending( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=10, row_order="desc" + ) + ) -> Any: + for chunk in chunks(count(start=48, step=-1), 10): + await asyncio.sleep(0.01) + data = [{"updated_at": i} for i in chunk] + yield data_to_item_format(item_type, data) + + data = list(descending) + assert data_item_length(data) == 48 - 10 + 1 # both bounds included + + +@pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) +def test_parallel_row_order_out_of_range(item_type: TDataItemFormat) -> None: + """Test automatic generator close for ordered rows""" + + @dlt.resource(parallelized=True) + def descending( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=10, row_order="desc" + ) + ) -> Any: + for chunk in chunks(count(start=48, step=-1), 10): + data = [{"updated_at": i} for i in chunk] + yield data_to_item_format(item_type, data) + + data = list(descending) + assert data_item_length(data) == 48 - 10 + 1 # both bounds included + + +def test_transformer_row_order_out_of_range() -> None: + out_of_range = [] + + @dlt.transformer + def descending( + package: int, + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=10, row_order="desc", primary_key="updated_at" + ), + ) -> Any: + for chunk in chunks(count(start=48, step=-1), 10): + data = [{"updated_at": i, "package": package} for i in chunk] + yield data_to_item_format("json", data) + if updated_at.can_close(): + out_of_range.append(package) + return + + data = list([3, 2, 1] | descending) + assert len(data) == 48 - 10 + 1 + # we take full package 3 and then nothing in 1 and 2 + assert len(out_of_range) == 3 + + +@pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) +def test_row_order_out_of_range(item_type: TDataItemFormat) -> None: + """Test automatic generator close for ordered rows""" + + @dlt.resource + def descending( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=10, row_order="desc" + ) + ) -> Any: + for chunk in chunks(count(start=48, step=-1), 10): + data = [{"updated_at": i} for i in chunk] + yield data_to_item_format(item_type, data) + + data = list(descending) + assert data_item_length(data) == 48 - 10 + 1 # both bounds included + + @dlt.resource + def ascending( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=22, end_value=45, row_order="asc" + ) + ) -> Any: + # use INFINITE sequence so this test wil not stop if closing logic is flawed + for chunk in chunks(count(start=22), 10): + data = [{"updated_at": i} for i in chunk] + yield data_to_item_format(item_type, data) + + data = list(ascending) + assert data_item_length(data) == 45 - 22 + + # use wrong row order, this will prevent end value to close pipe + + @dlt.resource + def ascending_desc( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", initial_value=22, end_value=45, row_order="desc" + ) + ) -> Any: + for chunk in chunks(range(22, 100), 10): + data = [{"updated_at": i} for i in chunk] + yield data_to_item_format(item_type, data) + + from dlt.extract import pipe + + with mock.patch.object( + pipe.Pipe, + "close", + side_effect=RuntimeError("Close pipe should not be called"), + ) as close_pipe: + data = list(ascending_desc) + assert close_pipe.assert_not_called + assert data_item_length(data) == 45 - 22 + + @pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) def test_get_incremental_value_type(item_type: TDataItemFormat) -> None: assert dlt.sources.incremental("id").get_incremental_value_type() is Any diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 48f966f831..5895c3b658 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -1130,8 +1130,29 @@ def mysource(): s = mysource() assert s.exhausted is False - assert next(iter(s)) == 2 # transformer is returned befor resource - assert s.exhausted is True + assert next(iter(s)) == 2 # transformer is returned before resource + assert s.exhausted is False + + +def test_exhausted_with_limit() -> None: + def open_generator_data(): + yield from [1, 2, 3, 4] + + s = DltSource( + Schema("source"), + "module", + [dlt.resource(open_generator_data)], + ) + assert s.exhausted is False + list(s) + assert s.exhausted is False + + # use limit + s.add_limit(1) + list(s) + # must still be false, limit should not open generator if it is still generator function + assert s.exhausted is False + assert list(s) == [1] def test_clone_resource_with_name() -> None: diff --git a/tests/extract/test_validation.py b/tests/extract/test_validation.py index 045f75ab73..b9307ab97c 100644 --- a/tests/extract/test_validation.py +++ b/tests/extract/test_validation.py @@ -10,7 +10,7 @@ from dlt.common.libs.pydantic import BaseModel from dlt.extract import DltResource -from dlt.extract.typing import ValidateItem +from dlt.extract.items import ValidateItem from dlt.extract.validation import PydanticValidator from dlt.extract.exceptions import ResourceExtractionError from dlt.pipeline.exceptions import PipelineStepFailed diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 98e798d0f0..170781ba3c 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -6,7 +6,7 @@ from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractStorage -from dlt.extract.typing import ItemTransform +from dlt.extract.items import ItemTransform from tests.utils import TDataItemFormat diff --git a/tests/sources/helpers/test_requests.py b/tests/sources/helpers/test_requests.py index 695fa93eca..aefdf23e77 100644 --- a/tests/sources/helpers/test_requests.py +++ b/tests/sources/helpers/test_requests.py @@ -1,7 +1,5 @@ -from contextlib import contextmanager -from typing import Iterator, Any, cast, Type +from typing import Iterator, Type from unittest import mock -from email.utils import format_datetime import os import random @@ -105,6 +103,25 @@ def test_retry_on_status_without_raise_for_status(mock_sleep: mock.MagicMock) -> assert m.call_count == RunConfiguration.request_max_attempts +def test_hooks_with_raise_for_statue() -> None: + url = "https://example.com/data" + session = Client(raise_for_status=True).session + + def _no_content(resp: requests.Response, *args, **kwargs) -> requests.Response: + resp.status_code = 204 + resp._content = b"[]" + return resp + + with requests_mock.mock(session=session) as m: + m.get(url, status_code=503) + response = session.get(url, hooks={"response": _no_content}) + # we simulate empty response + assert response.status_code == 204 + assert response.json() == [] + + assert m.call_count == 1 + + @pytest.mark.parametrize( "exception_class", [requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.ChunkedEncodingError], diff --git a/tests/utils.py b/tests/utils.py index 1d2ace2533..dd03279def 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -205,6 +205,25 @@ def data_to_item_format( raise ValueError(f"Unknown item format: {item_format}") +def data_item_length(data: TDataItem) -> int: + import pandas as pd + from dlt.common.libs.pyarrow import pyarrow as pa + + if isinstance(data, list): + # If data is a list, check if it's a list of supported data types + if all(isinstance(item, (list, pd.DataFrame, pa.Table, pa.RecordBatch)) for item in data): + return sum(data_item_length(item) for item in data) + # If it's a list but not a list of supported types, treat it as a single list object + else: + return len(data) + elif isinstance(data, pd.DataFrame): + return len(data.index) + elif isinstance(data, pa.Table) or isinstance(data, pa.RecordBatch): + return data.num_rows + else: + raise TypeError("Unsupported data type.") + + def init_test_logging(c: RunConfiguration = None) -> None: if not c: c = resolve_configuration(RunConfiguration()) From 23d52224af2b6874d6d55d1b8511c42647d50648 Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Tue, 5 Mar 2024 21:18:46 +0100 Subject: [PATCH 14/17] Quick fix to serialize load metrics as list instead of a dictionary (#1051) * Quick fix to serialize load metrics as list instead of a dictionary * Revert data type change for metrics * Add load id to metrics * Enrich metrics with load_id in StepInfo.asdict * Check last_trace schema hashes * Extend test suite for pipeline load info schema * Add faker to generate test data * Describe test scenario * Adjust test description docstring * Adjust test doctstrings * Rever airflow change * Remove faker random seed * Refactor test flow * Remove faker * Move tests to pipeline tests * Move test data and resources inside the test * fixes schema replace content hash tracking * fixes trace shape test --------- Co-authored-by: Marcin Rudolf --- dlt/common/pipeline.py | 18 ++-- dlt/common/schema/schema.py | 6 +- dlt/extract/extract.py | 13 ++- tests/common/schema/test_schema.py | 19 ++++ tests/pipeline/test_pipeline.py | 145 +++++++++++++++++++++++++++++ 5 files changed, 190 insertions(+), 11 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 6b7b308b44..df221ec703 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -102,13 +102,19 @@ def finished_at(self) -> datetime.datetime: def asdict(self) -> DictStrAny: # to be mixed with NamedTuple - d: DictStrAny = self._asdict() # type: ignore - d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name} - d["load_packages"] = [package.asdict() for package in self.load_packages] + step_info: DictStrAny = self._asdict() # type: ignore + step_info["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name} + step_info["load_packages"] = [package.asdict() for package in self.load_packages] if self.metrics: - d["started_at"] = self.started_at - d["finished_at"] = self.finished_at - return d + step_info["started_at"] = self.started_at + step_info["finished_at"] = self.finished_at + all_metrics = [] + for load_id, metrics in step_info["metrics"].items(): + for metric in metrics: + all_metrics.append({**dict(metric), "load_id": load_id}) + + step_info["metrics"] = all_metrics + return step_info def __str__(self) -> str: return self.asstr(verbosity=0) diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 6174afd8b4..4c81c8af72 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -136,8 +136,10 @@ def replace_schema_content( if link_to_replaced_schema: replaced_version_hash = self.stored_version_hash assert replaced_version_hash is not None - utils.store_prev_hash(stored_schema, replaced_version_hash) - stored_schema["version_hash"] = replaced_version_hash + # do not store hash if the replaced schema is identical + if stored_schema["version_hash"] != replaced_version_hash: + utils.store_prev_hash(stored_schema, replaced_version_hash) + stored_schema["version_hash"] = replaced_version_hash self._reset_schema(schema.name, schema._normalizers_config) self._from_stored_schema(stored_schema) diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index cdd61403a7..66122ecbc5 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -1,6 +1,6 @@ import contextlib from collections.abc import Sequence as C_Sequence -from datetime import datetime # noqa: 251 +from copy import copy import itertools from typing import List, Set, Dict, Optional, Set, Any import yaml @@ -33,6 +33,7 @@ from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints +from dlt.extract.incremental import IncrementalResourceWrapper from dlt.extract.pipe_iterator import PipeIterator from dlt.extract.source import DltSource from dlt.extract.resource import DltResource @@ -200,14 +201,20 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics: for resource in source.selected_resources.values(): # cleanup the hints hints = clean_hints[resource.name] = {} - resource_hints = resource._hints or resource.compute_table_schema() + resource_hints = copy(resource._hints) or resource.compute_table_schema() + if resource.incremental and "incremental" not in resource_hints: + resource_hints["incremental"] = resource.incremental # type: ignore for name, hint in resource_hints.items(): if hint is None or name in ["validator"]: continue if name == "incremental": # represent incremental as dictionary (it derives from BaseConfiguration) - hints[name] = dict(hint) # type: ignore[call-overload] + if isinstance(hint, IncrementalResourceWrapper): + hint = hint._incremental + # sometimes internal incremental is not bound + if hint: + hints[name] = dict(hint) # type: ignore[call-overload] continue if name == "original_columns": # this is original type of the columns ie. Pydantic model diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 372171ce1c..653e9cc351 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -250,6 +250,25 @@ def test_replace_schema_content() -> None: assert replaced_stored_hash == schema.stored_version_hash assert schema.stored_version_hash != schema.version_hash + # replace with self + eth_v5 = load_yml_case("schemas/eth/ethereum_schema_v5") + schema_eth = Schema.from_dict(eth_v5, bump_version=True) # type: ignore[arg-type] + stored_hash = schema_eth.stored_version_hash + schema_eth.replace_schema_content(schema_eth) + assert stored_hash == schema_eth.stored_version_hash + assert stored_hash == schema_eth.version_hash + assert stored_hash not in schema_eth.previous_hashes + + # replace with self but version is not bumped + eth_v5 = load_yml_case("schemas/eth/ethereum_schema_v5") + schema_eth = Schema.from_dict(eth_v5, bump_version=False) # type: ignore[arg-type] + stored_hash = schema_eth.stored_version_hash + schema_eth.replace_schema_content(schema_eth) + assert stored_hash == schema_eth.stored_version_hash + assert stored_hash != schema_eth.version_hash + assert stored_hash in schema_eth.previous_hashes + assert schema_eth.version_hash not in schema_eth.previous_hashes + @pytest.mark.parametrize( "columns,hint,value", diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 85fd7d9f34..eb76a0c840 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,6 +3,7 @@ import itertools import logging import os +import random from time import sleep from typing import Any, Tuple, cast import threading @@ -1714,3 +1715,147 @@ def source(): "\uf027a", ] assert len(load_info.loads_ids) == 1 + + +def test_pipeline_load_info_metrics_schema_is_not_chaning() -> None: + """Test if load info schema is idempotent throughout multiple load cycles + + ## Setup + + We will run the same pipeline with + + 1. A single source returning one resource and collect `schema.version_hash`, + 2. Another source returning 2 resources with more complex data and collect `schema.version_hash`, + 3. At last we run both sources, + 4. For each 1. 2. 3. we load `last_extract_info`, `last_normalize_info` and `last_load_info` and collect `schema.version_hash` + + ## Expected + + `version_hash` collected in each stage should remain the same at all times. + """ + data = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + + # this source must have all the hints so other sources do not change trace schema (extract/hints) + + @dlt.source + def users_source(): + return dlt.resource([data], name="users_resource") + + @dlt.source + def taxi_demand_source(): + @dlt.resource( + primary_key="city", columns=[{"name": "id", "data_type": "bigint", "precision": 4}] + ) + def locations(idx=dlt.sources.incremental("id")): + for idx in range(10): + yield { + "id": idx, + "address": f"address-{idx}", + "city": f"city-{idx}", + } + + @dlt.resource(primary_key="id") + def demand_map(): + for idx in range(10): + yield { + "id": idx, + "city": f"city-{idx}", + "demand": random.randint(0, 10000), + } + + return [locations, demand_map] + + schema = dlt.Schema(name="nice_load_info_schema") + pipeline = dlt.pipeline( + pipeline_name="quick_start", + destination="duckdb", + dataset_name="mydata", + # export_schema_path="schemas", + ) + + taxi_load_info = pipeline.run( + taxi_demand_source(), + ) + + schema_hashset = set() + pipeline.run( + [taxi_load_info], + table_name="_load_info", + schema=schema, + ) + + pipeline.run( + [pipeline.last_trace.last_normalize_info], + table_name="_normalize_info", + schema=schema, + ) + + pipeline.run( + [pipeline.last_trace.last_extract_info], + table_name="_extract_info", + schema=schema, + ) + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + trace_schema = pipeline.schemas["nice_load_info_schema"].to_pretty_yaml() + + users_load_info = pipeline.run( + users_source(), + ) + + pipeline.run( + [users_load_info], + table_name="_load_info", + schema=schema, + ) + assert trace_schema == pipeline.schemas["nice_load_info_schema"].to_pretty_yaml() + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + assert len(schema_hashset) == 1 + + pipeline.run( + [pipeline.last_trace.last_normalize_info], + table_name="_normalize_info", + schema=schema, + ) + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + assert len(schema_hashset) == 1 + + pipeline.run( + [pipeline.last_trace.last_extract_info], + table_name="_extract_info", + schema=schema, + ) + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + assert len(schema_hashset) == 1 + + load_info = pipeline.run( + [users_source(), taxi_demand_source()], + ) + + pipeline.run( + [load_info], + table_name="_load_info", + schema=schema, + ) + + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + + pipeline.run( + [pipeline.last_trace.last_normalize_info], + table_name="_normalize_info", + schema=schema, + ) + + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + + pipeline.run( + [pipeline.last_trace.last_extract_info], + table_name="_extract_info", + schema=schema, + ) + + schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash) + + assert len(schema_hashset) == 1 From adb6aa4a97d853e27e2115bfc7430edb035b5fff Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 5 Mar 2024 21:53:30 +0100 Subject: [PATCH 15/17] fix import schema yaml (#1013) * make sure import schema is respected if non other is present * add some previously missing tests * fix tests * fix bug in loading import schema * switch to dummy destination for tests * runs import tests on ci * load schema for live schemas will not committ live schema * committs and rollbacks live schemas in with_schema_sync, saves imported schema when no exception. uses load_schema to import schema on extract * moves import schema tests * fix bug in stored schema comparison * adds comment on restoring schema names and default schema name on exception * adds more tests and fixes one bug * align resource and pure data source schema with regular schema * fix linting and revert changes in basic resource schema resolution * fix one test * fix 2 review comments --------- Co-authored-by: Marcin Rudolf --- .github/workflows/test_common.yml | 4 +- dlt/common/storages/live_schema_storage.py | 22 ++- dlt/common/storages/schema_storage.py | 9 +- dlt/extract/extract.py | 2 + dlt/pipeline/pipeline.py | 51 +++-- tests/pipeline/test_import_export_schema.py | 206 ++++++++++++++++++++ tests/pipeline/test_pipeline.py | 10 +- 7 files changed, 277 insertions(+), 27 deletions(-) create mode 100644 tests/pipeline/test_import_export_schema.py diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 6ec1212d1a..2160025ea0 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -83,11 +83,11 @@ jobs: run: poetry install --no-interaction -E duckdb --with sentry-sdk - run: | - poetry run pytest tests/pipeline/test_pipeline.py + poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py if: runner.os != 'Windows' name: Run pipeline smoke tests with minimum deps Linux/MAC - run: | - poetry run pytest tests/pipeline/test_pipeline.py + poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py if: runner.os == 'Windows' name: Run smoke tests with minimum deps Windows shell: cmd diff --git a/dlt/common/storages/live_schema_storage.py b/dlt/common/storages/live_schema_storage.py index 838e65af40..d3d5f14fe5 100644 --- a/dlt/common/storages/live_schema_storage.py +++ b/dlt/common/storages/live_schema_storage.py @@ -1,7 +1,8 @@ -from typing import Dict, List +from typing import Dict, List, cast from dlt.common.schema.schema import Schema from dlt.common.configuration.accessors import config +from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage from dlt.common.storages.configuration import SchemaStorageConfiguration @@ -23,10 +24,10 @@ def __getitem__(self, name: str) -> Schema: return schema - def load_schema(self, name: str) -> Schema: - self.commit_live_schema(name) - # now live schema is saved so we can load it with the changes - return super().load_schema(name) + # def load_schema(self, name: str) -> Schema: + # self.commit_live_schema(name) + # # now live schema is saved so we can load it with the changes + # return super().load_schema(name) def save_schema(self, schema: Schema) -> str: rv = super().save_schema(schema) @@ -55,6 +56,17 @@ def commit_live_schema(self, name: str) -> Schema: self._save_schema(live_schema) return live_schema + def is_live_schema_committed(self, name: str) -> bool: + """Checks if live schema is present in storage and have same hash""" + live_schema = self.live_schemas.get(name) + if live_schema is None: + raise SchemaNotFoundError(name, f"live-schema://{name}") + try: + stored_schema_json = self._load_schema_json(name) + return live_schema.version_hash == cast(str, stored_schema_json.get("version_hash")) + except FileNotFoundError: + return False + def update_live_schema(self, schema: Schema, can_create_new: bool = True) -> None: """Will update live schema content without writing to storage. Optionally allows to create a new live schema""" live_schema = self.live_schemas.get(schema.name) diff --git a/dlt/common/storages/schema_storage.py b/dlt/common/storages/schema_storage.py index a43b8a1f9b..4745d50dcc 100644 --- a/dlt/common/storages/schema_storage.py +++ b/dlt/common/storages/schema_storage.py @@ -1,5 +1,5 @@ import yaml -from typing import Iterator, List, Mapping, Tuple +from typing import Iterator, List, Mapping, Tuple, cast from dlt.common import json, logger from dlt.common.configuration import with_config @@ -31,12 +31,15 @@ def __init__( self.config = config self.storage = FileStorage(config.schema_volume_path, makedirs=makedirs) + def _load_schema_json(self, name: str) -> DictStrAny: + schema_file = self._file_name_in_store(name, "json") + return cast(DictStrAny, json.loads(self.storage.load(schema_file))) + def load_schema(self, name: str) -> Schema: # loads a schema from a store holding many schemas - schema_file = self._file_name_in_store(name, "json") storage_schema: DictStrAny = None try: - storage_schema = json.loads(self.storage.load(schema_file)) + storage_schema = self._load_schema_json(name) # prevent external modifications of schemas kept in storage if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True): raise InStorageSchemaModified(name, self.config.schema_volume_path) diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 66122ecbc5..2ff813a2de 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -76,6 +76,8 @@ def choose_schema() -> Schema: """Except of explicitly passed schema, use a clone that will get discarded if extraction fails""" if schema: schema_ = schema + # TODO: We should start with a new schema of the same name here ideally, but many tests fail + # because of this. So some investigation is needed. elif pipeline.default_schema_name: schema_ = pipeline.schemas[pipeline.default_schema_name].clone() else: diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 44edcf2da5..185a11962a 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -161,13 +161,30 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any: for name in self._schema_storage.live_schemas: # refresh live schemas in storage or import schema path self._schema_storage.commit_live_schema(name) - rv = f(self, *args, **kwargs) - # save modified live schemas - for name in self._schema_storage.live_schemas: - self._schema_storage.commit_live_schema(name) - # refresh list of schemas if any new schemas are added - self.schema_names = self._list_schemas_sorted() - return rv + try: + rv = f(self, *args, **kwargs) + except Exception: + # because we committed live schema before calling f, we may safely + # drop all changes in live schemas + for name in list(self._schema_storage.live_schemas.keys()): + try: + schema = self._schema_storage.load_schema(name) + self._schema_storage.update_live_schema(schema, can_create_new=False) + except FileNotFoundError: + # no storage schema yet so pop live schema (created in call to f) + self._schema_storage.live_schemas.pop(name, None) + # NOTE: with_state_sync will restore schema_names and default_schema_name + # so we do not need to do that here + raise + else: + # save modified live schemas + for name, schema in self._schema_storage.live_schemas.items(): + self._schema_storage.commit_live_schema(name) + # also save import schemas only here + self._schema_storage.save_import_schema_if_not_exists(schema) + # refresh list of schemas if any new schemas are added + self.schema_names = self._list_schemas_sorted() + return rv return _wrap # type: ignore @@ -1019,20 +1036,32 @@ def _extract_source( self, extract: Extract, source: DltSource, max_parallel_items: int, workers: int ) -> str: # discover the existing pipeline schema - if source.schema.name in self.schemas: - # use clone until extraction complete - pipeline_schema = self.schemas[source.schema.name].clone() + try: + # all live schemas are initially committed and during the extract will accumulate changes in memory + # if schema is committed try to take schema from storage + if self._schema_storage.is_live_schema_committed(source.schema.name): + # this will (1) save live schema if modified (2) look for import schema if present + # (3) load import schema an overwrite pipeline schema if import schema modified + # (4) load pipeline schema if no import schema is present + pipeline_schema = self.schemas.load_schema(source.schema.name) + else: + # if schema is not committed we know we are in process of extraction + pipeline_schema = self.schemas[source.schema.name] + pipeline_schema = pipeline_schema.clone() # use clone until extraction complete # apply all changes in the source schema to pipeline schema # NOTE: we do not apply contracts to changes done programmatically pipeline_schema.update_schema(source.schema) # replace schema in the source source.schema = pipeline_schema + except FileNotFoundError: + pass # extract into pipeline schema load_id = extract.extract(source, max_parallel_items, workers) # save import with fully discovered schema - self._schema_storage.save_import_schema_if_not_exists(source.schema) + # NOTE: moved to with_schema_sync, remove this if all test pass + # self._schema_storage.save_import_schema_if_not_exists(source.schema) # update live schema but not update the store yet self._schema_storage.update_live_schema(source.schema) diff --git a/tests/pipeline/test_import_export_schema.py b/tests/pipeline/test_import_export_schema.py new file mode 100644 index 0000000000..b1c2284f24 --- /dev/null +++ b/tests/pipeline/test_import_export_schema.py @@ -0,0 +1,206 @@ +import dlt, os, pytest + +from dlt.common.utils import uniq_id + +from tests.utils import TEST_STORAGE_ROOT +from dlt.common.schema import Schema +from dlt.common.storages.schema_storage import SchemaStorage +from dlt.common.schema.exceptions import CannotCoerceColumnException +from dlt.pipeline.exceptions import PipelineStepFailed + +from dlt.destinations import dummy + + +IMPORT_SCHEMA_PATH = os.path.join(TEST_STORAGE_ROOT, "schemas", "import") +EXPORT_SCHEMA_PATH = os.path.join(TEST_STORAGE_ROOT, "schemas", "export") + + +EXAMPLE_DATA = [{"id": 1, "name": "dave"}] + + +def _get_import_schema(schema_name: str) -> Schema: + return SchemaStorage.load_schema_file(IMPORT_SCHEMA_PATH, schema_name) + + +def _get_export_schema(schema_name: str) -> Schema: + return SchemaStorage.load_schema_file(EXPORT_SCHEMA_PATH, schema_name) + + +def test_schemas_files_get_created() -> None: + name = "schema_test" + uniq_id() + + p = dlt.pipeline( + pipeline_name=name, + destination=dummy(completed_prob=1), + import_schema_path=IMPORT_SCHEMA_PATH, + export_schema_path=EXPORT_SCHEMA_PATH, + ) + + p.run(EXAMPLE_DATA, table_name="person") + + # basic check we have the table def in the export schema + export_schema = _get_export_schema(name) + assert export_schema.tables["person"]["columns"]["id"]["data_type"] == "bigint" + assert export_schema.tables["person"]["columns"]["name"]["data_type"] == "text" + + # discovered columns are not present in the import schema + import_schema = _get_import_schema(name) + assert "id" not in import_schema.tables["person"]["columns"] + assert "name" not in import_schema.tables["person"]["columns"] + + +def test_provided_columns_exported_to_import() -> None: + name = "schema_test" + uniq_id() + + p = dlt.pipeline( + pipeline_name=name, + destination=dummy(completed_prob=1), + import_schema_path=IMPORT_SCHEMA_PATH, + export_schema_path=EXPORT_SCHEMA_PATH, + ) + + p.run(EXAMPLE_DATA, table_name="person", columns={"id": {"data_type": "text"}}) + + # updated columns are in export + export_schema = _get_export_schema(name) + assert export_schema.tables["person"]["columns"]["id"]["data_type"] == "text" + assert export_schema.tables["person"]["columns"]["name"]["data_type"] == "text" + + # discovered columns are not present in the import schema + # but provided column is + import_schema = _get_import_schema(name) + assert "name" not in import_schema.tables["person"]["columns"] + assert import_schema.tables["person"]["columns"]["id"]["data_type"] == "text" + + +def test_import_schema_is_respected() -> None: + name = "schema_test" + uniq_id() + + p = dlt.pipeline( + pipeline_name=name, + destination=dummy(completed_prob=1), + import_schema_path=IMPORT_SCHEMA_PATH, + export_schema_path=EXPORT_SCHEMA_PATH, + ) + p.run(EXAMPLE_DATA, table_name="person") + assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "bigint" + + # take default schema, modify column type and save it to import folder + modified_schema = p.default_schema.clone() + modified_schema.tables["person"]["columns"]["id"]["data_type"] = "text" + with open(os.path.join(IMPORT_SCHEMA_PATH, name + ".schema.yaml"), "w", encoding="utf-8") as f: + f.write(modified_schema.to_pretty_yaml()) + + # this will provoke a CannotCoerceColumnException + with pytest.raises(PipelineStepFailed) as exc: + p.run(EXAMPLE_DATA, table_name="person") + assert type(exc.value.exception) == CannotCoerceColumnException + + # schema is changed + assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "text" + + # import schema is not overwritten + assert _get_import_schema(name).tables["person"]["columns"]["id"]["data_type"] == "text" + + # when creating a new schema (e.g. with full refresh), this will work + p = dlt.pipeline( + pipeline_name=name, + destination=dummy(completed_prob=1), + import_schema_path=IMPORT_SCHEMA_PATH, + export_schema_path=EXPORT_SCHEMA_PATH, + full_refresh=True, + ) + p.run(EXAMPLE_DATA, table_name="person") + assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "text" + + # import schema is not overwritten + assert _get_import_schema(name).tables["person"]["columns"]["id"]["data_type"] == "text" + + # export now includes the modified column type + export_schema = _get_export_schema(name) + assert export_schema.tables["person"]["columns"]["id"]["data_type"] == "text" + assert export_schema.tables["person"]["columns"]["name"]["data_type"] == "text" + + +def test_only_explicit_hints_in_import_schema() -> None: + @dlt.source(schema_contract={"columns": "evolve"}) + def source(): + @dlt.resource(primary_key="id", name="person") + def resource(): + yield EXAMPLE_DATA + + return resource() + + p = dlt.pipeline( + pipeline_name=uniq_id(), + destination=dummy(completed_prob=1), + import_schema_path=IMPORT_SCHEMA_PATH, + export_schema_path=EXPORT_SCHEMA_PATH, + full_refresh=True, + ) + p.run(source()) + + # import schema has only the primary key hint, but no name or data types + import_schema = _get_import_schema("source") + assert import_schema.tables["person"]["columns"].keys() == {"id"} + assert import_schema.tables["person"]["columns"]["id"] == { + "nullable": False, + "primary_key": True, + "name": "id", + } + + # pipeline schema has all the stuff + assert p.default_schema.tables["person"]["columns"].keys() == { + "id", + "name", + "_dlt_load_id", + "_dlt_id", + } + assert p.default_schema.tables["person"]["columns"]["id"] == { + "nullable": False, + "primary_key": True, + "name": "id", + "data_type": "bigint", + } + + # adding column to the resource will not change the import schema, but the pipeline schema will evolve + @dlt.resource(primary_key="id", name="person", columns={"email": {"data_type": "text"}}) + def resource(): + yield EXAMPLE_DATA + + p.run(resource()) + + # check schemas + import_schema = _get_import_schema("source") + assert import_schema.tables["person"]["columns"].keys() == {"id"} + assert p.default_schema.tables["person"]["columns"].keys() == { + "id", + "name", + "_dlt_load_id", + "_dlt_id", + "email", + } + + # changing the import schema will force full update + import_schema.tables["person"]["columns"]["age"] = { + "data_type": "bigint", + "nullable": True, + "name": "age", + } + with open( + os.path.join(IMPORT_SCHEMA_PATH, "source" + ".schema.yaml"), "w", encoding="utf-8" + ) as f: + f.write(import_schema.to_pretty_yaml()) + + # run with the original source, email hint should be gone after this, but we now have age + p.run(source()) + + assert p.default_schema.tables["person"]["columns"].keys() == { + "id", + "name", + "_dlt_load_id", + "_dlt_id", + "age", + } + import_schema = _get_import_schema("source") + assert import_schema.tables["person"]["columns"].keys() == {"id", "age"} diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index eb76a0c840..0cebeb2ff7 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -581,9 +581,8 @@ def data_piece_2(): assert p.first_run is True assert p.has_data is False assert p.default_schema_name is None - # one of the schemas is in memory - # TODO: we may want to fix that - assert len(p._schema_storage.list_schemas()) == 1 + # live schemas created during extract are popped from mem + assert len(p._schema_storage.list_schemas()) == 0 # restore the pipeline p = dlt.attach(pipeline_name) @@ -617,9 +616,8 @@ def data_schema_3(): # first run didn't really happen assert p.first_run is True assert p.has_data is False - # schemas from two sources are in memory - # TODO: we may want to fix that - assert len(p._schema_storage.list_schemas()) == 2 + # live schemas created during extract are popped from mem + assert len(p._schema_storage.list_schemas()) == 0 assert p.default_schema_name is None os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately From f608131080e8c7bfcbcd819e5093a78e3b903aa1 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 5 Mar 2024 21:56:20 +0100 Subject: [PATCH 16/17] bumps dlt version to 0.4.6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0eb33d885d..88e6bd9390 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.4.5" +version = "0.4.6" description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] From 3761335e6de765d9b7b45721b3cd67cb1a805b1c Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 6 Mar 2024 08:21:32 +0100 Subject: [PATCH 17/17] fixes wrong link in alerting docs --- docs/website/docs/running-in-production/alerting.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/website/docs/running-in-production/alerting.md b/docs/website/docs/running-in-production/alerting.md index 2eb787359d..1364c1f988 100644 --- a/docs/website/docs/running-in-production/alerting.md +++ b/docs/website/docs/running-in-production/alerting.md @@ -55,7 +55,7 @@ for package in info.load_packages: for table_name, table in package.schema_update.items(): # Iterate over each column in the current table for column_name, column in table["columns"].items(): - # Send a message to the Slack channel with the table + # Send a message to the Slack channel with the table # and column update information send_slack_message( hook, @@ -66,7 +66,7 @@ for package in info.load_packages: ) ) ``` -Refer to this [example](../../docs/examples/chess_production/) for a practical application of the method in a production environment. +Refer to this [example](../examples/chess_production/) for a practical application of the method in a production environment. -Similarly, Slack notifications can be extended to include information on pipeline execution times, loading durations, schema modifications, and more. For comprehensive details on configuring and sending messages to Slack, please read [here](./running#using-slack-to-send-messages). +Similarly, Slack notifications can be extended to include information on pipeline execution times, loading durations, schema modifications, and more. For comprehensive details on configuring and sending messages to Slack, please read [here](./running#using-slack-to-send-messages).