diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index ce81b81433..83e1e66b29 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -4,15 +4,12 @@ Sequence, Iterable, Optional, - Any, - Dict, Union, TYPE_CHECKING, ) from dlt.common.jsonpath import TAnyJsonPath from dlt.common.exceptions import TerminalException -from dlt.common.schema.schema import Schema from dlt.common.schema.typing import TSimpleRegex from dlt.common.pipeline import pipeline_state as current_pipeline_state, TRefreshMode from dlt.common.storages.load_package import TLoadPackageDropTablesState @@ -139,7 +136,7 @@ def __call__(self) -> None: self.pipeline.normalize() try: - self.pipeline.load(raise_on_failed_jobs=True) + self.pipeline.load() except Exception: # Clear extracted state on failure so command can run again self.pipeline.drop_pending_packages() diff --git a/docs/examples/CONTRIBUTING.md b/docs/examples/CONTRIBUTING.md index 625a09d9c0..bca43ba9eb 100644 --- a/docs/examples/CONTRIBUTING.md +++ b/docs/examples/CONTRIBUTING.md @@ -10,7 +10,7 @@ Note: All paths in this guide are relative to the `dlt` repository directory. - Update the doc string which will compromise the generated markdown file, check the other examples how it is done - If your example requires any secrets, add the vars to the example.secrects.toml but do not enter the values. - Add your example code, make sure you have a `if __name__ = "__main__"` clause in which you run the example script, this will be used for testing -- You should add one or two assertions after running your example and maybe also `load_info.raise_on_failed_jobs()`, this will help greatly with testing +- You should add one or two assertions after running your example ## Testing - You can test your example simply by running your example script from your example folder. On CI a test will be automatically generated. @@ -31,4 +31,4 @@ If you use any secrets for the code snippets, e.g. Zendesk requires credentials. If your example requires any additional dependency, then you can add it - To `pyproject.toml` in the `[tool.poetry.group.docs.dependencies]` section. -- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit. \ No newline at end of file +- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit. diff --git a/docs/examples/_template/_template.py b/docs/examples/_template/_template.py index cdd38f8204..ae156b6f0b 100644 --- a/docs/examples/_template/_template.py +++ b/docs/examples/_template/_template.py @@ -25,6 +25,3 @@ # Extract, normalize, and load the data load_info = pipeline.run([1, 2, 3], table_name="player") print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/chess/chess.py b/docs/examples/chess/chess.py index 7b577c2646..6af431b330 100644 --- a/docs/examples/chess/chess.py +++ b/docs/examples/chess/chess.py @@ -56,6 +56,3 @@ def players_games(username: Any) -> Iterator[TDataItems]: ).run(chess(max_players=5, month=9)) # display where the data went print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/chess_production/chess_production.py b/docs/examples/chess_production/chess_production.py index c0f11203c8..bfbc16b39e 100644 --- a/docs/examples/chess_production/chess_production.py +++ b/docs/examples/chess_production/chess_production.py @@ -88,8 +88,6 @@ def load_data_with_retry(pipeline, data): load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() # send notification send_slack_message( pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" @@ -170,6 +168,3 @@ def load_data_with_retry(pipeline, data): # get data for a few famous players data = chess(max_players=MAX_PLAYERS) load_info = load_data_with_retry(pipeline, data) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/connector_x_arrow/connector_x_arrow.py b/docs/examples/connector_x_arrow/connector_x_arrow.py index 9603fb2ba0..a321f94580 100644 --- a/docs/examples/connector_x_arrow/connector_x_arrow.py +++ b/docs/examples/connector_x_arrow/connector_x_arrow.py @@ -67,6 +67,3 @@ def genome_resource(): # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["genome"] == 1000 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py index a4ed9601a3..305c7d1f1a 100644 --- a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py +++ b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py @@ -21,7 +21,6 @@ __source_name__ = "spotify" import datetime # noqa: I251 -import os from dataclasses import dataclass, fields from pathlib import Path from typing import Any @@ -142,7 +141,6 @@ def lancedb_destination(items: TDataItems, table: TTableSchema) -> None: ) load_info = pipeline.run(spotify_shows()) - load_info.raise_on_failed_jobs() print(load_info) row_counts = pipeline.last_trace.last_normalize_info diff --git a/docs/examples/custom_naming/custom_naming.py b/docs/examples/custom_naming/custom_naming.py index e99e582213..74feeb13ec 100644 --- a/docs/examples/custom_naming/custom_naming.py +++ b/docs/examples/custom_naming/custom_naming.py @@ -46,8 +46,6 @@ # Extract, normalize, and load the data load_info = pipeline.run([{"StückId": 1}], table_name="Ausrüstung") print(load_info) - # make sure nothing failed - load_info.raise_on_failed_jobs() with pipeline.sql_client() as client: # NOTE: we quote case sensitive identifers with client.execute_query('SELECT "StückId" FROM "Ausrüstung"') as cur: @@ -66,12 +64,10 @@ # duckdb is case insensitive so tables and columns below would clash but sql_ci_no_collision prevents that data_1 = {"ItemID": 1, "itemid": "collides"} load_info = pipeline.run([data_1], table_name="BigData") - load_info.raise_on_failed_jobs() data_2 = {"1Data": 1, "_1data": "collides"} # use colliding table load_info = pipeline.run([data_2], table_name="bigdata") - load_info.raise_on_failed_jobs() with pipeline.sql_client() as client: from duckdb import DuckDBPyConnection diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index fbc0686fb9..716009865e 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -100,6 +100,3 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: print(row_counts.keys()) assert row_counts["hidden_columns_merged_cells"] == 7 assert row_counts["blank_columns"] == 21 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/incremental_loading/incremental_loading.py b/docs/examples/incremental_loading/incremental_loading.py index f1de4eecfe..90c5e93347 100644 --- a/docs/examples/incremental_loading/incremental_loading.py +++ b/docs/examples/incremental_loading/incremental_loading.py @@ -147,6 +147,3 @@ def get_pages( # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["ticket_events"] == 17 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 046e566efd..3ce462302d 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -128,9 +128,6 @@ def convert_mongo_objs(value: Any) -> Any: tables.pop("_dlt_pipeline_state") assert len(tables) == 7, pipeline.last_trace.last_normalize_info - # make sure nothing failed - load_info.raise_on_failed_jobs() - # The second method involves setting the max_table_nesting attribute directly # on the source data object. # This allows for dynamic control over the maximum nesting @@ -149,9 +146,6 @@ def convert_mongo_objs(value: Any) -> Any: tables.pop("_dlt_pipeline_state") assert len(tables) == 1, pipeline.last_trace.last_normalize_info - # make sure nothing failed - load_info.raise_on_failed_jobs() - # The third method involves applying data type hints to specific columns in the data. # In this case, we tell dlt that column 'cast' (containing a list of actors) # in 'movies' table should have type complex which means @@ -168,6 +162,3 @@ def convert_mongo_objs(value: Any) -> Any: tables = pipeline.last_trace.last_normalize_info.row_counts tables.pop("_dlt_pipeline_state") assert len(tables) == 6, pipeline.last_trace.last_normalize_info - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index 5fbba98a21..76629fc612 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -80,6 +80,3 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/postgres_to_postgres/postgres_to_postgres.py b/docs/examples/postgres_to_postgres/postgres_to_postgres.py index 3e88cb7ee8..aaebb224fd 100644 --- a/docs/examples/postgres_to_postgres/postgres_to_postgres.py +++ b/docs/examples/postgres_to_postgres/postgres_to_postgres.py @@ -214,9 +214,6 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): assert row_counts["table_1"] == 9 assert row_counts["table_2"] == 9 - # make sure nothing failed - load_info.raise_on_failed_jobs() - if load_type == "replace": # 4. Load DuckDB local database into Postgres print("##################################### START DUCKDB LOAD ########") diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index 9b6fbee150..18eea002b3 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -175,9 +175,6 @@ def get_pages( print(load_info) - # make sure nothing failed - load_info.raise_on_failed_jobs() - # getting the authenticated Qdrant client to connect to your Qdrant database with pipeline.destination_client() as destination_client: from qdrant_client import QdrantClient @@ -194,6 +191,3 @@ def get_pages( ) assert len(response) <= 3 and len(response) > 0 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/transformers/transformers.py b/docs/examples/transformers/transformers.py index 14d23de12d..ebf1f935ba 100644 --- a/docs/examples/transformers/transformers.py +++ b/docs/examples/transformers/transformers.py @@ -78,6 +78,3 @@ def species(pokemon_details): assert row_counts["pokemon"] == 20 assert row_counts["species"] == 20 assert "pokemon_list" not in row_counts - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/website/docs/general-usage/snippets/destination-snippets.py b/docs/website/docs/general-usage/snippets/destination-snippets.py index 3484d943a0..d27eb4f0ae 100644 --- a/docs/website/docs/general-usage/snippets/destination-snippets.py +++ b/docs/website/docs/general-usage/snippets/destination-snippets.py @@ -94,7 +94,6 @@ def destination_instantiation_snippet() -> None: # here dependencies dependencies will be imported, secrets pulled and destination accessed # we pass bucket_url explicitly and expect credentials passed by config provider load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url)) - load_info.raise_on_failed_jobs() # @@@DLT_SNIPPET_END late_destination_access diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index 7fc0f2bce9..c7c26d9e7b 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -179,9 +179,7 @@ async def _run_async(): loop.run_in_executor(executor, _run_pipeline, pipeline_1, async_table), loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table), ) - # result contains two LoadInfo instances - results[0].raise_on_failed_jobs() - results[1].raise_on_failed_jobs() + # results contains two LoadInfo instances # load data asyncio.run(_run_async()) diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index 72a5bd463a..cc17dae7bb 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -267,8 +267,6 @@ All the jobs that were running in parallel are completed before raising. The dlt Here is an example `config.toml` to disable this behavior: ```toml -# you should really load just one job at a time to get the deterministic behavior -load.workers=1 # I hope you know what you are doing by setting this to false load.raise_on_failed_jobs=false ```