diff --git a/dlt/destinations/impl/databricks/sql_client.py b/dlt/destinations/impl/databricks/sql_client.py index 4c06ef1cf3..8228fa06a4 100644 --- a/dlt/destinations/impl/databricks/sql_client.py +++ b/dlt/destinations/impl/databricks/sql_client.py @@ -148,7 +148,7 @@ def _make_database_exception(ex: Exception) -> Exception: return DatabaseTransientException(ex) return DatabaseTerminalException(ex) elif isinstance(ex, databricks_lib.OperationalError): - return DatabaseTerminalException(ex) + return DatabaseTransientException(ex) elif isinstance(ex, (databricks_lib.ProgrammingError, databricks_lib.IntegrityError)): return DatabaseTerminalException(ex) elif isinstance(ex, databricks_lib.DatabaseError): diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index 8494d3bba3..9623e65850 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -66,8 +66,7 @@ def __init__( buffer_max_items: int = 1000, retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY, retry_pipeline_steps: Sequence[TPipelineStep] = ("load",), - fail_task_if_any_job_failed: bool = True, - abort_task_if_any_job_failed: bool = False, + abort_task_if_any_job_failed: bool = True, wipe_local_data: bool = True, save_load_info: bool = False, save_trace_info: bool = False, @@ -82,11 +81,7 @@ def __init__( The `data_folder` is available in certain Airflow deployments. In case of Composer, it is a location on the gcs bucket. `use_data_folder` is disabled and should be enabled only when needed. The operations on bucket are non-atomic and way slower than on local storage and should be avoided. - `fail_task_if_any_job_failed` will raise an exception if any of the loading jobs failed permanently and thus fail the current Airflow task. - This happens **after all dlt loading jobs executed**. See more here: https://dlthub.com/docs/running-in-production/running#failed-jobs - - `abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. This may put your warehouse in - inconsistent state so the option is disabled by default. + `abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline. The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace @@ -99,7 +94,6 @@ def __init__( buffer_max_items (int, optional): Maximum number of buffered items. Use 0 to keep dlt built-in limit. Defaults to 1000. retry_policy (_type_, optional): Tenacity retry policy. Defaults to no retry. retry_pipeline_steps (Sequence[TPipelineStep], optional): Which pipeline steps are eligible for retry. Defaults to ("load", ). - fail_task_if_any_job_failed (bool, optional): Will fail a task if any of the dlt load jobs failed. Defaults to True. wipe_local_data (bool, optional): Will wipe all the data created by pipeline, also in case of exception. Defaults to False. save_load_info (bool, optional): Will save extensive load info to the destination. Defaults to False. save_trace_info (bool, optional): Will save trace info to the destination. Defaults to False. @@ -112,7 +106,6 @@ def __init__( self.buffer_max_items = buffer_max_items self.retry_policy = retry_policy self.retry_pipeline_steps = retry_pipeline_steps - self.fail_task_if_any_job_failed = fail_task_if_any_job_failed self.abort_task_if_any_job_failed = abort_task_if_any_job_failed self.wipe_local_data = wipe_local_data self.save_load_info = save_load_info @@ -270,10 +263,11 @@ def _run( dlt.config["data_writer.buffer_max_items"] = self.buffer_max_items logger.info(f"Set data_writer.buffer_max_items to {self.buffer_max_items}") - # enable abort package if job failed - if self.abort_task_if_any_job_failed: - dlt.config["load.raise_on_failed_jobs"] = True - logger.info("Set load.abort_task_if_any_job_failed to True") + if self.abort_task_if_any_job_failed is not None: + dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed + logger.info( + "Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}" + ) if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR: task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER) @@ -329,9 +323,7 @@ def log_after_attempt(retry_state: RetryCallState) -> None: table_name="_trace", loader_file_format=loader_file_format, ) - # raise on failed jobs if requested - if self.fail_task_if_any_job_failed: - load_info.raise_on_failed_jobs() + finally: # always completely wipe out pipeline folder, in case of success and failure if self.wipe_local_data: diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 836da516e9..54b7ad798f 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -13,7 +13,7 @@ class LoaderConfiguration(PoolRunnerConfiguration): parallelism_strategy: Optional[TLoaderParallelismStrategy] = None """Which parallelism strategy to use at load time""" pool_type: TPoolType = "thread" # mostly i/o (upload) so may be thread pool - raise_on_failed_jobs: bool = False + raise_on_failed_jobs: bool = True """when True, raises on terminally failed jobs immediately""" raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index ce81b81433..83e1e66b29 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -4,15 +4,12 @@ Sequence, Iterable, Optional, - Any, - Dict, Union, TYPE_CHECKING, ) from dlt.common.jsonpath import TAnyJsonPath from dlt.common.exceptions import TerminalException -from dlt.common.schema.schema import Schema from dlt.common.schema.typing import TSimpleRegex from dlt.common.pipeline import pipeline_state as current_pipeline_state, TRefreshMode from dlt.common.storages.load_package import TLoadPackageDropTablesState @@ -139,7 +136,7 @@ def __call__(self) -> None: self.pipeline.normalize() try: - self.pipeline.load(raise_on_failed_jobs=True) + self.pipeline.load() except Exception: # Clear extracted state on failure so command can run again self.pipeline.drop_pending_packages() diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 6ad443e3d8..fa10f5ac89 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -536,7 +536,7 @@ def load( credentials: Any = None, *, workers: int = 20, - raise_on_failed_jobs: bool = False, + raise_on_failed_jobs: bool = ConfigValue, ) -> LoadInfo: """Loads the packages prepared by `normalize` method into the `dataset_name` at `destination`, optionally using provided `credentials`""" # set destination and default dataset if provided (this is the reason we have state sync here) diff --git a/docs/examples/CONTRIBUTING.md b/docs/examples/CONTRIBUTING.md index 625a09d9c0..bca43ba9eb 100644 --- a/docs/examples/CONTRIBUTING.md +++ b/docs/examples/CONTRIBUTING.md @@ -10,7 +10,7 @@ Note: All paths in this guide are relative to the `dlt` repository directory. - Update the doc string which will compromise the generated markdown file, check the other examples how it is done - If your example requires any secrets, add the vars to the example.secrects.toml but do not enter the values. - Add your example code, make sure you have a `if __name__ = "__main__"` clause in which you run the example script, this will be used for testing -- You should add one or two assertions after running your example and maybe also `load_info.raise_on_failed_jobs()`, this will help greatly with testing +- You should add one or two assertions after running your example ## Testing - You can test your example simply by running your example script from your example folder. On CI a test will be automatically generated. @@ -31,4 +31,4 @@ If you use any secrets for the code snippets, e.g. Zendesk requires credentials. If your example requires any additional dependency, then you can add it - To `pyproject.toml` in the `[tool.poetry.group.docs.dependencies]` section. -- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit. \ No newline at end of file +- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit. diff --git a/docs/examples/_template/_template.py b/docs/examples/_template/_template.py index cdd38f8204..ae156b6f0b 100644 --- a/docs/examples/_template/_template.py +++ b/docs/examples/_template/_template.py @@ -25,6 +25,3 @@ # Extract, normalize, and load the data load_info = pipeline.run([1, 2, 3], table_name="player") print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/chess/chess.py b/docs/examples/chess/chess.py index 7b577c2646..6af431b330 100644 --- a/docs/examples/chess/chess.py +++ b/docs/examples/chess/chess.py @@ -56,6 +56,3 @@ def players_games(username: Any) -> Iterator[TDataItems]: ).run(chess(max_players=5, month=9)) # display where the data went print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/chess_production/chess_production.py b/docs/examples/chess_production/chess_production.py index c0f11203c8..196bc13e18 100644 --- a/docs/examples/chess_production/chess_production.py +++ b/docs/examples/chess_production/chess_production.py @@ -88,8 +88,6 @@ def load_data_with_retry(pipeline, data): load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() # send notification send_slack_message( pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" @@ -169,7 +167,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(max_players=MAX_PLAYERS) - load_info = load_data_with_retry(pipeline, data) - - # make sure nothing failed - load_info.raise_on_failed_jobs() + load_data_with_retry(pipeline, data) diff --git a/docs/examples/connector_x_arrow/connector_x_arrow.py b/docs/examples/connector_x_arrow/connector_x_arrow.py index 9603fb2ba0..a321f94580 100644 --- a/docs/examples/connector_x_arrow/connector_x_arrow.py +++ b/docs/examples/connector_x_arrow/connector_x_arrow.py @@ -67,6 +67,3 @@ def genome_resource(): # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["genome"] == 1000 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index 48a16f15c0..67e5d5bb4a 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -91,6 +91,3 @@ def bigquery_insert( load_info = pipeline.run(resource(url=OWID_DISASTERS_URL)) print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py index a4ed9601a3..305c7d1f1a 100644 --- a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py +++ b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py @@ -21,7 +21,6 @@ __source_name__ = "spotify" import datetime # noqa: I251 -import os from dataclasses import dataclass, fields from pathlib import Path from typing import Any @@ -142,7 +141,6 @@ def lancedb_destination(items: TDataItems, table: TTableSchema) -> None: ) load_info = pipeline.run(spotify_shows()) - load_info.raise_on_failed_jobs() print(load_info) row_counts = pipeline.last_trace.last_normalize_info diff --git a/docs/examples/custom_naming/custom_naming.py b/docs/examples/custom_naming/custom_naming.py index e99e582213..74feeb13ec 100644 --- a/docs/examples/custom_naming/custom_naming.py +++ b/docs/examples/custom_naming/custom_naming.py @@ -46,8 +46,6 @@ # Extract, normalize, and load the data load_info = pipeline.run([{"StückId": 1}], table_name="Ausrüstung") print(load_info) - # make sure nothing failed - load_info.raise_on_failed_jobs() with pipeline.sql_client() as client: # NOTE: we quote case sensitive identifers with client.execute_query('SELECT "StückId" FROM "Ausrüstung"') as cur: @@ -66,12 +64,10 @@ # duckdb is case insensitive so tables and columns below would clash but sql_ci_no_collision prevents that data_1 = {"ItemID": 1, "itemid": "collides"} load_info = pipeline.run([data_1], table_name="BigData") - load_info.raise_on_failed_jobs() data_2 = {"1Data": 1, "_1data": "collides"} # use colliding table load_info = pipeline.run([data_2], table_name="bigdata") - load_info.raise_on_failed_jobs() with pipeline.sql_client() as client: from duckdb import DuckDBPyConnection diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index fbc0686fb9..716009865e 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -100,6 +100,3 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: print(row_counts.keys()) assert row_counts["hidden_columns_merged_cells"] == 7 assert row_counts["blank_columns"] == 21 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/incremental_loading/incremental_loading.py b/docs/examples/incremental_loading/incremental_loading.py index f1de4eecfe..90c5e93347 100644 --- a/docs/examples/incremental_loading/incremental_loading.py +++ b/docs/examples/incremental_loading/incremental_loading.py @@ -147,6 +147,3 @@ def get_pages( # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["ticket_events"] == 17 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 046e566efd..3ce462302d 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -128,9 +128,6 @@ def convert_mongo_objs(value: Any) -> Any: tables.pop("_dlt_pipeline_state") assert len(tables) == 7, pipeline.last_trace.last_normalize_info - # make sure nothing failed - load_info.raise_on_failed_jobs() - # The second method involves setting the max_table_nesting attribute directly # on the source data object. # This allows for dynamic control over the maximum nesting @@ -149,9 +146,6 @@ def convert_mongo_objs(value: Any) -> Any: tables.pop("_dlt_pipeline_state") assert len(tables) == 1, pipeline.last_trace.last_normalize_info - # make sure nothing failed - load_info.raise_on_failed_jobs() - # The third method involves applying data type hints to specific columns in the data. # In this case, we tell dlt that column 'cast' (containing a list of actors) # in 'movies' table should have type complex which means @@ -168,6 +162,3 @@ def convert_mongo_objs(value: Any) -> Any: tables = pipeline.last_trace.last_normalize_info.row_counts tables.pop("_dlt_pipeline_state") assert len(tables) == 6, pipeline.last_trace.last_normalize_info - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/parent_child_relationship/test_parent_child_relationship.py b/docs/examples/parent_child_relationship/test_parent_child_relationship.py index 95d1bade97..b2f1311195 100644 --- a/docs/examples/parent_child_relationship/test_parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/test_parent_child_relationship.py @@ -10,9 +10,8 @@ keywords: [parent child relationship, parent key] --- -This example demonstrates handling data with parent-child relationships using -the `dlt` library. You learn how to integrate specific fields (e.g., primary, -foreign keys) from a parent record into each child record. +This example demonstrates handling data with parent-child relationships using the `dlt` library. +You learn how to integrate specific fields (e.g., primary, foreign keys) from a parent record into each child record. In this example, we'll explore how to: diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index 5fbba98a21..76629fc612 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -80,6 +80,3 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/postgres_to_postgres/postgres_to_postgres.py b/docs/examples/postgres_to_postgres/postgres_to_postgres.py index 3e88cb7ee8..aaebb224fd 100644 --- a/docs/examples/postgres_to_postgres/postgres_to_postgres.py +++ b/docs/examples/postgres_to_postgres/postgres_to_postgres.py @@ -214,9 +214,6 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): assert row_counts["table_1"] == 9 assert row_counts["table_2"] == 9 - # make sure nothing failed - load_info.raise_on_failed_jobs() - if load_type == "replace": # 4. Load DuckDB local database into Postgres print("##################################### START DUCKDB LOAD ########") diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index 9b6fbee150..18eea002b3 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -175,9 +175,6 @@ def get_pages( print(load_info) - # make sure nothing failed - load_info.raise_on_failed_jobs() - # getting the authenticated Qdrant client to connect to your Qdrant database with pipeline.destination_client() as destination_client: from qdrant_client import QdrantClient @@ -194,6 +191,3 @@ def get_pages( ) assert len(response) <= 3 and len(response) > 0 - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/transformers/transformers.py b/docs/examples/transformers/transformers.py index 14d23de12d..ebf1f935ba 100644 --- a/docs/examples/transformers/transformers.py +++ b/docs/examples/transformers/transformers.py @@ -78,6 +78,3 @@ def species(pokemon_details): assert row_counts["pokemon"] == 20 assert row_counts["species"] == 20 assert "pokemon_list" not in row_counts - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/website/docs/general-usage/snippets/destination-snippets.py b/docs/website/docs/general-usage/snippets/destination-snippets.py index 3484d943a0..c1c0f745c5 100644 --- a/docs/website/docs/general-usage/snippets/destination-snippets.py +++ b/docs/website/docs/general-usage/snippets/destination-snippets.py @@ -94,7 +94,7 @@ def destination_instantiation_snippet() -> None: # here dependencies dependencies will be imported, secrets pulled and destination accessed # we pass bucket_url explicitly and expect credentials passed by config provider load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url)) - load_info.raise_on_failed_jobs() + print(load_info) # @@@DLT_SNIPPET_END late_destination_access diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index 7fc0f2bce9..33c29eb681 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -179,9 +179,9 @@ async def _run_async(): loop.run_in_executor(executor, _run_pipeline, pipeline_1, async_table), loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table), ) - # result contains two LoadInfo instances - results[0].raise_on_failed_jobs() - results[1].raise_on_failed_jobs() + # results contains two LoadInfo instances + print("pipeline_1", results[0]) + print("pipeline_2", results[1]) # load data asyncio.run(_run_async()) diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index cc089a1393..0c010332e7 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -259,9 +259,19 @@ def check(ex: Exception): ### Failed jobs -If any job in the package **fail terminally** it will be moved to `failed_jobs` folder and assigned -such status. By default **no exception is raised** and other jobs will be processed and completed. -You may inspect if the failed jobs are present by checking the load info as follows: +If any job in the package **fails terminally** it will be moved to `failed_jobs` folder and assigned +such status. +By default, **an exceptions is raised** and on the first failed job, the load package will be aborted with `LoadClientJobFailed` (terminal exception). +Such package will be completed but its load id is not added to the `_dlt_loads` table. +All the jobs that were running in parallel are completed before raising. The dlt state, if present, will not be visible to `dlt`. +Here is an example `config.toml` to disable this behavior: + +```toml +# I hope you know what you are doing by setting this to false +load.raise_on_failed_jobs=false +``` + +If you prefer dlt to to not raise a terminal exception on failed jobs then you can manually check for failed jobs and raise an exception by checking the load info as follows: ```py # returns True if there are failed jobs in any of the load packages @@ -270,18 +280,6 @@ print(load_info.has_failed_jobs) load_info.raise_on_failed_jobs() ``` -You may also abort the load package with `LoadClientJobFailed` (terminal exception) on a first -failed job. Such package is will be completed but its load id is not added to the -`_dlt_loads` table. All the jobs that were running in parallel are completed before raising. The dlt -state, if present, will not be visible to `dlt`. Here's example `config.toml` to enable this option: - -```toml -# you should really load just one job at a time to get the deterministic behavior -load.workers=1 -# I hope you know what you are doing by setting this to true -load.raise_on_failed_jobs=true -``` - :::caution Note that certain write dispositions will irreversibly modify your data 1. `replace` write disposition with the default `truncate-and-insert` [strategy](../general-usage/full-loading.md) will truncate tables before loading. diff --git a/tests/cli/common/test_cli_invoke.py b/tests/cli/common/test_cli_invoke.py index 0c6be1ea24..77c003a5c9 100644 --- a/tests/cli/common/test_cli_invoke.py +++ b/tests/cli/common/test_cli_invoke.py @@ -63,7 +63,7 @@ def test_invoke_pipeline(script_runner: ScriptRunner) -> None: shutil.copytree("tests/cli/cases/deploy_pipeline", TEST_STORAGE_ROOT, dirs_exist_ok=True) with set_working_dir(TEST_STORAGE_ROOT): - with custom_environ({"COMPETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}): + with custom_environ({"COMPLETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}): venv = Venv.restore_current() venv.run_script("dummy_pipeline.py") # we check output test_pipeline_command else diff --git a/tests/cli/test_pipeline_command.py b/tests/cli/test_pipeline_command.py index 664646e2e5..a5bb0ca467 100644 --- a/tests/cli/test_pipeline_command.py +++ b/tests/cli/test_pipeline_command.py @@ -170,6 +170,8 @@ def test_pipeline_command_failed_jobs(repo_dir: str, project_files: FileStorage) # now run the pipeline os.environ["FAIL_PROB"] = "1.0" + # let it fail without an exception + os.environ["RAISE_ON_FAILED_JOBS"] = "false" venv = Venv.restore_current() try: print(venv.run_script("chess_pipeline.py")) diff --git a/tests/destinations/conftest.py b/tests/destinations/conftest.py index 89f7cdffed..286b665adc 100644 --- a/tests/destinations/conftest.py +++ b/tests/destinations/conftest.py @@ -5,3 +5,4 @@ wipe_pipeline, duckdb_pipeline_location, ) +from tests.common.configuration.utils import environment \ No newline at end of file diff --git a/tests/destinations/test_custom_destination.py b/tests/destinations/test_custom_destination.py index 6ebf7f6ef3..30d06e8b96 100644 --- a/tests/destinations/test_custom_destination.py +++ b/tests/destinations/test_custom_destination.py @@ -1,8 +1,7 @@ -from typing import List, Tuple, Dict, Union, cast +from typing import List, Tuple, Dict import dlt import pytest -import pytest import os import inspect @@ -498,17 +497,15 @@ def sink_func_with_spec( # call fails because `my_predefined_val` is required part of spec, even if not injected with pytest.raises(ConfigFieldMissingException): - info = dlt.pipeline("sink_test", destination=sink_func_with_spec(), dev_mode=True).run( + dlt.pipeline("sink_test", destination=sink_func_with_spec(), dev_mode=True).run( [1, 2, 3], table_name="items" ) - info.raise_on_failed_jobs() # call happens now os.environ["MY_PREDEFINED_VAL"] = "VAL" - info = dlt.pipeline("sink_test", destination=sink_func_with_spec(), dev_mode=True).run( + dlt.pipeline("sink_test", destination=sink_func_with_spec(), dev_mode=True).run( [1, 2, 3], table_name="items" ) - info.raise_on_failed_jobs() # check destination with additional config params @dlt.destination(spec=MyDestinationSpec) diff --git a/tests/destinations/test_destination_name_and_config.py b/tests/destinations/test_destination_name_and_config.py index 1e432a7803..efaaafcfeb 100644 --- a/tests/destinations/test_destination_name_and_config.py +++ b/tests/destinations/test_destination_name_and_config.py @@ -1,6 +1,5 @@ import os import pytest -import posixpath import dlt from dlt.common.configuration.exceptions import ConfigFieldMissingException @@ -9,7 +8,6 @@ from dlt.common.storages import FilesystemConfiguration from dlt.destinations import duckdb, dummy, filesystem -from tests.common.configuration.utils import environment from tests.utils import TEST_STORAGE_ROOT @@ -71,7 +69,6 @@ def test_preserve_destination_instance() -> None: os.environ["COMPLETED_PROB"] = "1.0" load_info = p.run([1, 2, 3], table_name="table", dataset_name="dataset") - load_info.raise_on_failed_jobs() # destination and staging stay the same assert destination_id == id(p.destination) assert staging_id == id(p.staging) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index a2a3fbafac..0a0de75987 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -207,8 +207,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) + p.run(some_data()) with p.sql_client() as c: with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur: @@ -248,8 +248,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) + p.run(some_data()) with p.sql_client() as c: with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur: @@ -455,7 +455,7 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) with p.sql_client() as c: with c.execute_query( @@ -1325,12 +1325,11 @@ def some_data( ): yield from source_items - info = p.run(some_data()) - info.raise_on_failed_jobs() + p.run(some_data()) norm_info = p.last_trace.last_normalize_info assert norm_info.row_counts["some_data"] == 20 # load incrementally - info = p.run(some_data()) + p.run(some_data()) norm_info = p.last_trace.last_normalize_info assert "some_data" not in norm_info.row_counts @@ -2560,11 +2559,8 @@ def test_source(): pip_1_name = "test_pydantic_columns_validator_" + uniq_id() pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb") - info = pipeline.run(test_source()) - info.raise_on_failed_jobs() - - info = pipeline.run(test_source_incremental()) - info.raise_on_failed_jobs() + pipeline.run(test_source()) + pipeline.run(test_source_incremental()) # verify that right steps are at right place steps = test_source().table_name._pipe._steps diff --git a/tests/load/bigquery/test_bigquery_streaming_insert.py b/tests/load/bigquery/test_bigquery_streaming_insert.py index c950a46f91..20d07c7c76 100644 --- a/tests/load/bigquery/test_bigquery_streaming_insert.py +++ b/tests/load/bigquery/test_bigquery_streaming_insert.py @@ -1,7 +1,10 @@ import pytest import dlt +from dlt.common.pipeline import LoadInfo from dlt.destinations.adapters import bigquery_adapter +from dlt.load.exceptions import LoadClientJobFailed +from dlt.pipeline.exceptions import PipelineStepFailed from tests.pipeline.utils import assert_load_info @@ -40,13 +43,16 @@ def test_resource(): test_resource.apply_hints(additional_table_hints={"x-insert-api": "streaming"}) pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery") - info = pipe.run(test_resource) + with pytest.raises(PipelineStepFailed) as pip_ex: + pipe.run(test_resource) + assert isinstance(pip_ex.value.step_info, LoadInfo) + assert pip_ex.value.step_info.has_failed_jobs # pick the failed job - failed_job = info.load_packages[0].jobs["failed_jobs"][0] + assert isinstance(pip_ex.value.__cause__, LoadClientJobFailed) assert ( """BigQuery streaming insert can only be used with `append`""" """ write_disposition, while the given resource has `merge`.""" - ) in failed_job.failed_message + ) in pip_ex.value.__cause__.failed_message def test_bigquery_streaming_nested_data(): diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 31199bd8e4..0a6aa234ea 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -38,7 +38,6 @@ drop_active_pipeline_data, TABLE_UPDATE, sequence_generator, - empty_schema, ) # mark all tests as essential, do not remove @@ -1020,8 +1019,7 @@ def sources() -> List[DltResource]: dlt.resource([{"col2": "ABC"}], name="hints"), table_description="Once upon a time a small table got hinted twice.", ) - info = pipeline.run(mod_hints) - info.raise_on_failed_jobs() + pipeline.run(mod_hints) assert pipeline.last_trace.last_normalize_info.row_counts["hints"] == 1 with pipeline.sql_client() as c: diff --git a/tests/load/conftest.py b/tests/load/conftest.py index a110b1198f..76a7248e5b 100644 --- a/tests/load/conftest.py +++ b/tests/load/conftest.py @@ -2,7 +2,13 @@ import pytest from typing import Iterator -from tests.load.utils import ALL_BUCKETS, DEFAULT_BUCKETS, WITH_GDRIVE_BUCKETS, drop_pipeline +from tests.load.utils import ( + ALL_BUCKETS, + DEFAULT_BUCKETS, + WITH_GDRIVE_BUCKETS, + drop_pipeline, + empty_schema, +) from tests.utils import preserve_environ, patch_home_dir diff --git a/tests/load/pipeline/test_csv_loading.py b/tests/load/pipeline/test_csv_loading.py index 6a2be2eb40..a2cc786915 100644 --- a/tests/load/pipeline/test_csv_loading.py +++ b/tests/load/pipeline/test_csv_loading.py @@ -92,7 +92,6 @@ def test_custom_csv_no_header( table_name="no_header", loader_file_format=file_format, ) - info.raise_on_failed_jobs() print(info) assert_only_table_columns(pipeline, "no_header", [col["name"] for col in columns]) rows = load_tables_to_dicts(pipeline, "no_header") @@ -114,6 +113,8 @@ def test_custom_csv_no_header( ids=lambda x: x.name, ) def test_custom_wrong_header(destination_config: DestinationTestConfiguration) -> None: + # do not raise on failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" csv_format = CsvFormatConfiguration(delimiter="|", include_header=True) # apply to collected config pipeline = destination_config.setup_pipeline("postgres_" + uniq_id(), dev_mode=True) diff --git a/tests/load/pipeline/test_databricks_pipeline.py b/tests/load/pipeline/test_databricks_pipeline.py index 73868f4e97..9d152bb099 100644 --- a/tests/load/pipeline/test_databricks_pipeline.py +++ b/tests/load/pipeline/test_databricks_pipeline.py @@ -20,6 +20,9 @@ def test_databricks_external_location(destination_config: DestinationTestConfiguration) -> None: # do not interfere with state os.environ["RESTORE_FROM_DESTINATION"] = "False" + # let the package complete even with failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" + dataset_name = "test_databricks_external_location" + uniq_id() from dlt.destinations import databricks, filesystem diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index f6ddd79b99..0e44c754e7 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -1,5 +1,5 @@ import os -from typing import Any, Iterator, Dict, Any, List +from typing import Iterator, Dict, Any, List from unittest import mock from itertools import chain @@ -415,7 +415,7 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf attached.extract(droppable_source()) # TODO: individual steps cause pipeline.run() never raises attached.normalize() - attached.load(raise_on_failed_jobs=True) + attached.load() @pytest.mark.parametrize( diff --git a/tests/load/pipeline/test_duckdb.py b/tests/load/pipeline/test_duckdb.py index 1129523318..bc823a1857 100644 --- a/tests/load/pipeline/test_duckdb.py +++ b/tests/load/pipeline/test_duckdb.py @@ -31,17 +31,15 @@ def test_duck_case_names(destination_config: DestinationTestConfiguration) -> No os.environ["SCHEMA__NAMING"] = "duck_case" pipeline = destination_config.setup_pipeline("test_duck_case_names") # create tables and columns with emojis and other special characters - info = pipeline.run( + pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock"), **destination_config.run_kwargs, ) - info.raise_on_failed_jobs() - info = pipeline.run( + pipeline.run( [{"🐾Feet": 2, "1+1": "two", "\nhey": "value"}], table_name="🦚Peacocks🦚", **destination_config.run_kwargs, ) - info.raise_on_failed_jobs() table_counts = load_table_counts( pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()] ) @@ -104,7 +102,7 @@ def test_duck_precision_types(destination_config: DestinationTestConfiguration) "col5_int": 2**64 // 2 - 1, } ] - info = pipeline.run( + pipeline.run( row, table_name="row", **destination_config.run_kwargs, @@ -112,7 +110,6 @@ def test_duck_precision_types(destination_config: DestinationTestConfiguration) TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS + TABLE_UPDATE_ALL_INT_PRECISIONS ), ) - info.raise_on_failed_jobs() with pipeline.sql_client() as client: table = client.native_connection.sql("SELECT * FROM row").arrow() @@ -167,14 +164,13 @@ class EventV1(BaseModel): event = {"ver": 1, "id": "id1", "details": {"detail_id": "detail_1", "is_complete": False}} - info = pipeline.run( + pipeline.run( [event], table_name="events", columns=EventV1, loader_file_format="parquet", schema_contract="evolve", ) - info.raise_on_failed_jobs() print(pipeline.default_schema.to_pretty_yaml()) # we will use a different pipeline with a separate schema but writing to the same dataset and to the same table @@ -200,14 +196,13 @@ class EventV2(BaseModel): "test_new_nested_prop_parquet_2", dataset_name="test_dataset" ) pipeline.destination = duck_factory # type: ignore - info = pipeline.run( + pipeline.run( [event], table_name="events", columns=EventV2, loader_file_format="parquet", schema_contract="evolve", ) - info.raise_on_failed_jobs() print(pipeline.default_schema.to_pretty_yaml()) @@ -220,8 +215,7 @@ def test_jsonl_reader(destination_config: DestinationTestConfiguration) -> None: pipeline = destination_config.setup_pipeline("test_jsonl_reader") data = [{"a": 1, "b": 2}, {"a": 1}] - info = pipeline.run(data, table_name="data", loader_file_format="jsonl") - info.raise_on_failed_jobs() + pipeline.run(data, table_name="data", loader_file_format="jsonl") @pytest.mark.parametrize( @@ -245,9 +239,8 @@ def _get_shuffled_events(repeat: int = 1): os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "200" pipeline = destination_config.setup_pipeline("test_provoke_parallel_parquet_same_table") + pipeline.run(_get_shuffled_events(50), **destination_config.run_kwargs) - info = pipeline.run(_get_shuffled_events(50), **destination_config.run_kwargs) - info.raise_on_failed_jobs() assert_data_table_counts( pipeline, expected_counts={ diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 98d6cce294..184a8bafa4 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -44,7 +44,6 @@ def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None: """Run pipeline twice with merge write disposition Regardless wether primary key is set or not, filesystem appends """ - import pyarrow.parquet as pq # Module is evaluated by other tests os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" @@ -102,7 +101,6 @@ def test_pipeline_csv_filesystem_destination(item_type: TestDataItemFormat) -> N item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -128,7 +126,6 @@ def test_csv_options(item_type: TestDataItemFormat) -> None: item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -157,7 +154,6 @@ def test_csv_quoting_style(item_type: TestDataItemFormat) -> None: item, _, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -693,7 +689,6 @@ def test_delta_table_empty_source( Tests both empty Arrow table and `dlt.mark.materialize_table_schema()`. """ - from dlt.common.libs.pyarrow import pyarrow as pa from dlt.common.libs.deltalake import ensure_delta_compatible_arrow_data, get_delta_tables from tests.pipeline.utils import users_materialize_table_schema diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index edc210800c..73bcbed035 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -9,7 +9,6 @@ from dlt.common import json, sleep from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination -from dlt.common.destination.exceptions import DestinationHasFailedJobs from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema @@ -24,6 +23,7 @@ from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.extract.exceptions import ResourceNameMissing from dlt.extract.source import DltSource +from dlt.load.exceptions import LoadClientJobFailed from dlt.pipeline.exceptions import ( CannotRestorePipelineException, PipelineConfigMissing, @@ -381,6 +381,8 @@ def extended_rows(): # lets violate unique constraint on postgres, redshift and BQ ignore unique indexes if destination_config.destination == "postgres": + # let it complete even with PK violation (which is a teminal error) + os.environ["RAISE_ON_FAILED_JOBS"] = "false" assert p.dataset_name == dataset_name err_info = p.run( source(1).with_resources("simple_rows"), diff --git a/tests/load/pipeline/test_redshift.py b/tests/load/pipeline/test_redshift.py index 7c26ac17a9..ca98a653f9 100644 --- a/tests/load/pipeline/test_redshift.py +++ b/tests/load/pipeline/test_redshift.py @@ -3,7 +3,9 @@ import pytest import dlt +from dlt.common.destination.exceptions import UnsupportedDataType from dlt.common.utils import uniq_id +from dlt.pipeline.exceptions import PipelineStepFailed from tests.load.utils import destinations_configs, DestinationTestConfiguration from tests.cases import table_update_and_row, assert_all_data_types_row from tests.pipeline.utils import assert_load_info @@ -32,11 +34,10 @@ def my_resource() -> Iterator[Any]: def my_source() -> Any: return my_resource - info = pipeline.run(my_source(), **destination_config.run_kwargs) - - assert info.has_failed_jobs - - assert ( - "Redshift cannot load TIME columns from" - in info.load_packages[0].jobs["failed_jobs"][0].failed_message - ) + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(my_source(), **destination_config.run_kwargs) + assert isinstance(pip_ex.value.__cause__, UnsupportedDataType) + if destination_config.file_format == "parquet": + assert pip_ex.value.__cause__.data_type == "time" + else: + assert pip_ex.value.__cause__.data_type in ("time", "binary") diff --git a/tests/load/pipeline/test_snowflake_pipeline.py b/tests/load/pipeline/test_snowflake_pipeline.py index 87c6f337a1..31a62b6409 100644 --- a/tests/load/pipeline/test_snowflake_pipeline.py +++ b/tests/load/pipeline/test_snowflake_pipeline.py @@ -9,6 +9,8 @@ from dlt.common.utils import uniq_id from dlt.destinations.exceptions import DatabaseUndefinedRelation +from dlt.load.exceptions import LoadClientJobFailed +from dlt.pipeline.exceptions import PipelineStepFailed from tests.cases import assert_all_data_types_row from tests.load.pipeline.test_pipelines import simple_nested_pipeline from tests.load.snowflake.test_snowflake_client import QUERY_TAG @@ -101,10 +103,10 @@ def test_snowflake_custom_stage(destination_config: DestinationTestConfiguration """Using custom stage name instead of the table stage""" os.environ["DESTINATION__SNOWFLAKE__STAGE_NAME"] = "my_non_existing_stage" pipeline, data = simple_nested_pipeline(destination_config, f"custom_stage_{uniq_id()}", False) - info = pipeline.run(data(), **destination_config.run_kwargs) - with pytest.raises(DestinationHasFailedJobs) as f_jobs: - info.raise_on_failed_jobs() - assert "MY_NON_EXISTING_STAGE" in f_jobs.value.failed_jobs[0].failed_message + with pytest.raises(PipelineStepFailed) as f_jobs: + pipeline.run(data(), **destination_config.run_kwargs) + assert isinstance(f_jobs.value.__cause__, LoadClientJobFailed) + assert "MY_NON_EXISTING_STAGE" in f_jobs.value.__cause__.failed_message drop_active_pipeline_data() diff --git a/tests/load/qdrant/test_restore_state.py b/tests/load/qdrant/test_restore_state.py index 31bc725d24..63f575f6ec 100644 --- a/tests/load/qdrant/test_restore_state.py +++ b/tests/load/qdrant/test_restore_state.py @@ -1,11 +1,9 @@ -from typing import TYPE_CHECKING import pytest from qdrant_client import models import dlt from tests.load.utils import destinations_configs, DestinationTestConfiguration -from dlt.common.destination.reference import JobClientBase, WithStateSync from dlt.destinations.impl.qdrant.qdrant_job_client import QdrantClient @@ -37,7 +35,7 @@ def dummy_table(): pipeline.extract(dummy_table) pipeline.normalize() - info = pipeline.load(raise_on_failed_jobs=True) + info = pipeline.load() client: QdrantClient with pipeline.destination_client() as client: # type: ignore[assignment] diff --git a/tests/load/sources/sql_database/test_sql_database_source.py b/tests/load/sources/sql_database/test_sql_database_source.py index b0f7fdf1de..dc2ad533ce 100644 --- a/tests/load/sources/sql_database/test_sql_database_source.py +++ b/tests/load/sources/sql_database/test_sql_database_source.py @@ -1,7 +1,5 @@ import os -import re from copy import deepcopy -from datetime import datetime # noqa: I251 from typing import Any, Callable, cast, List, Optional, Set import pytest @@ -505,7 +503,7 @@ def test_all_types_no_precision_hints( source.resources[table_name].add_map(unwrap_json_connector_x("json_col")) pipeline.extract(source) pipeline.normalize(loader_file_format="parquet") - pipeline.load().raise_on_failed_jobs() + pipeline.load() schema = pipeline.default_schema # print(pipeline.default_schema.to_pretty_yaml()) @@ -605,7 +603,7 @@ def test_deferred_reflect_in_source( pipeline.extract(source) # use insert values to convert parquet into INSERT pipeline.normalize(loader_file_format="insert_values") - pipeline.load().raise_on_failed_jobs() + pipeline.load() precision_table = pipeline.default_schema.get_table("has_precision") assert_precision_columns( precision_table["columns"], @@ -661,7 +659,7 @@ def test_deferred_reflect_in_resource( pipeline.extract(table) # use insert values to convert parquet into INSERT pipeline.normalize(loader_file_format="insert_values") - pipeline.load().raise_on_failed_jobs() + pipeline.load() precision_table = pipeline.default_schema.get_table("has_precision") assert_precision_columns( precision_table["columns"], @@ -954,7 +952,7 @@ def dummy_source(): pipeline.extract(source) pipeline.normalize() - pipeline.load().raise_on_failed_jobs() + pipeline.load() channel_rows = load_tables_to_dicts(pipeline, "chat_channel")["chat_channel"] assert channel_rows and all(row["active"] for row in channel_rows) diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index f0d0f8cdde..26b90e5a0d 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -9,7 +9,7 @@ from dlt.common.exceptions import TerminalException, TerminalValueError from dlt.common.storages import FileStorage, PackageStorage, ParsedLoadJobFileName from dlt.common.storages.configuration import FilesystemConfiguration -from dlt.common.storages.load_package import LoadJobInfo, TPackageJobState +from dlt.common.storages.load_package import TPackageJobState from dlt.common.storages.load_storage import JobFileFormatUnsupported from dlt.common.destination.reference import RunnableLoadJob, TDestination from dlt.common.schema.utils import ( @@ -24,6 +24,7 @@ from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration from dlt.load import Load +from dlt.load.configuration import LoaderConfiguration from dlt.load.exceptions import ( LoadClientJobFailed, LoadClientJobRetry, @@ -107,14 +108,11 @@ def test_unsupported_write_disposition() -> None: schema.get_table("event_user")["write_disposition"] = "skip" # write back schema load.load_storage.normalized_packages.save_schema(load_id, schema) - with ThreadPoolExecutor() as pool: - load.run(pool) - # job with unsupported write disp. is failed - failed_job = load.load_storage.loaded_packages.list_failed_jobs(load_id)[0] - failed_message = load.load_storage.loaded_packages.get_job_failed_message( - load_id, ParsedLoadJobFileName.parse(failed_job) - ) - assert "LoadClientUnsupportedWriteDisposition" in failed_message + with pytest.raises(LoadClientJobFailed) as e: + with ThreadPoolExecutor() as pool: + load.run(pool) + + assert "LoadClientUnsupportedWriteDisposition" in e.value.failed_message def test_big_loadpackages() -> None: @@ -228,10 +226,19 @@ def test_spool_job_failed() -> None: started_files = load.load_storage.normalized_packages.list_started_jobs(load_id) assert len(started_files) == 0 - # test the whole flow - load = setup_loader(client_config=DummyClientConfiguration(fail_prob=1.0)) + # test the whole + loader_config = LoaderConfiguration( + raise_on_failed_jobs=False, + workers=1, + pool_type="none", + ) + load = setup_loader( + client_config=DummyClientConfiguration(fail_prob=1.0), + loader_config=loader_config, + ) load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) run_all(load) + package_info = load.load_storage.get_load_package_info(load_id) assert package_info.state == "loaded" # all jobs failed @@ -246,8 +253,6 @@ def test_spool_job_failed() -> None: def test_spool_job_failed_terminally_exception_init() -> None: - # this config fails job on start - os.environ["LOAD__RAISE_ON_FAILED_JOBS"] = "true" load = setup_loader(client_config=DummyClientConfiguration(fail_terminally_in_init=True)) load_id, _ = prepare_load_package(load.load_storage, NORMALIZED_FILES) with patch.object(dummy_impl.DummyClient, "complete_load") as complete_load: @@ -269,8 +274,6 @@ def test_spool_job_failed_terminally_exception_init() -> None: def test_spool_job_failed_transiently_exception_init() -> None: - # this config fails job on start - os.environ["LOAD__RAISE_ON_FAILED_JOBS"] = "true" load = setup_loader(client_config=DummyClientConfiguration(fail_transiently_in_init=True)) load_id, _ = prepare_load_package(load.load_storage, NORMALIZED_FILES) with patch.object(dummy_impl.DummyClient, "complete_load") as complete_load: @@ -293,8 +296,6 @@ def test_spool_job_failed_transiently_exception_init() -> None: def test_spool_job_failed_exception_complete() -> None: - # this config fails job on start - os.environ["LOAD__RAISE_ON_FAILED_JOBS"] = "true" load = setup_loader(client_config=DummyClientConfiguration(fail_prob=1.0)) load_id, _ = prepare_load_package(load.load_storage, NORMALIZED_FILES) with pytest.raises(LoadClientJobFailed) as py_ex: @@ -520,7 +521,10 @@ def test_failed_loop() -> None: delete_completed_jobs=True, client_config=DummyClientConfiguration(fail_prob=1.0) ) # actually not deleted because one of the jobs failed - assert_complete_job(load, should_delete_completed=False) + with pytest.raises(LoadClientJobFailed) as e: + assert_complete_job(load, should_delete_completed=False) + + assert "a random fail occurred" in e.value.failed_message # two failed jobs assert len(dummy_impl.JOBS) == 2 assert list(dummy_impl.JOBS.values())[0].state() == "failed" @@ -535,7 +539,10 @@ def test_failed_loop_followup_jobs() -> None: client_config=DummyClientConfiguration(fail_prob=1.0, create_followup_jobs=True), ) # actually not deleted because one of the jobs failed - assert_complete_job(load, should_delete_completed=False) + with pytest.raises(LoadClientJobFailed) as e: + assert_complete_job(load, should_delete_completed=False) + + assert "a random fail occurred" in e.value.failed_message # followup jobs were not started assert len(dummy_impl.JOBS) == 2 assert len(dummy_impl.CREATED_FOLLOWUP_JOBS) == 0 @@ -1045,6 +1052,7 @@ def run_all(load: Load) -> None: def setup_loader( delete_completed_jobs: bool = False, client_config: DummyClientConfiguration = None, + loader_config: LoaderConfiguration = None, filesystem_staging: bool = False, ) -> Load: # reset jobs for a test @@ -1078,6 +1086,7 @@ def setup_loader( return Load( destination, initial_client_config=client_config, + config=loader_config, staging_destination=staging, # type: ignore[arg-type] initial_staging_client_config=staging_system_config, ) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index ec44e1c4db..fb2b2d70f6 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -61,7 +61,7 @@ def bot_events(): pipeline_name = f"test_max_table_nesting_{nesting_level}_{expected_num_tables}" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination=dummy(timeout=0.1), + destination=dummy(timeout=0.1, completed_prob=1), dev_mode=True, ) @@ -168,7 +168,7 @@ def some_data(): pipeline_name = "test_different_table_nesting_levels" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination=dummy(timeout=0.1), + destination=dummy(timeout=0.1, completed_prob=1), dev_mode=True, ) diff --git a/tests/pipeline/conftest.py b/tests/pipeline/conftest.py index f6c47e35b1..2411999dac 100644 --- a/tests/pipeline/conftest.py +++ b/tests/pipeline/conftest.py @@ -4,5 +4,7 @@ patch_home_dir, wipe_pipeline, duckdb_pipeline_location, + test_storage, ) +from tests.common.configuration.utils import environment, toml_providers from tests.pipeline.utils import drop_dataset_from_env diff --git a/tests/pipeline/test_arrow_sources.py b/tests/pipeline/test_arrow_sources.py index 4cdccb1e34..243fe95ef9 100644 --- a/tests/pipeline/test_arrow_sources.py +++ b/tests/pipeline/test_arrow_sources.py @@ -2,8 +2,6 @@ from typing import Any import pytest import pandas as pd -import os -import io import pyarrow as pa import dlt @@ -236,7 +234,7 @@ def test_load_arrow_vary_schema(item_type: TPythonTableFormat) -> None: pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb") item, _, _ = arrow_table_all_data_types(item_type, include_not_normalized_name=False) - pipeline.run(item, table_name="data").raise_on_failed_jobs() + pipeline.run(item, table_name="data") item, _, _ = arrow_table_all_data_types(item_type, include_not_normalized_name=False) # remove int column @@ -246,7 +244,7 @@ def test_load_arrow_vary_schema(item_type: TPythonTableFormat) -> None: names = item.schema.names names.remove("int") item = item.select(names) - pipeline.run(item, table_name="data").raise_on_failed_jobs() + pipeline.run(item, table_name="data") @pytest.mark.parametrize("item_type", ["pandas", "arrow-table", "arrow-batch"]) @@ -310,10 +308,10 @@ def some_data(): assert schema.tables["some_data"]["columns"]["_dlt_id"]["data_type"] == "text" assert schema.tables["some_data"]["columns"]["_dlt_load_id"]["data_type"] == "text" - pipeline.load().raise_on_failed_jobs() + pipeline.load() # should be able to load again - pipeline.run(some_data()).raise_on_failed_jobs() + pipeline.run(some_data()) # should be able to load arrow without a column try: @@ -322,12 +320,12 @@ def some_data(): names = item.schema.names names.remove("int") item = item.select(names) - pipeline.run(item, table_name="some_data").raise_on_failed_jobs() + pipeline.run(item, table_name="some_data") # should be able to load arrow with a new column item, records, _ = arrow_table_all_data_types(item_type, num_rows=200) item = item.append_column("static_int", [[0] * 200]) - pipeline.run(item, table_name="some_data").raise_on_failed_jobs() + pipeline.run(item, table_name="some_data") schema = pipeline.default_schema assert schema.tables["some_data"]["columns"]["static_int"]["data_type"] == "bigint" @@ -380,8 +378,7 @@ def _to_item(table: Any) -> Any: assert normalize_info.row_counts["table"] == 5432 * 3 # load to duckdb - load_info = pipeline.load() - load_info.raise_on_failed_jobs() + pipeline.load() @pytest.mark.parametrize("item_type", ["arrow-table", "pandas", "arrow-batch"]) @@ -423,7 +420,7 @@ def _to_item(table: Any) -> Any: shuffled_names.append("binary") assert actual_tbl.schema.names == shuffled_names - pipeline.load().raise_on_failed_jobs() + pipeline.load() @pytest.mark.parametrize("item_type", ["arrow-table", "pandas", "arrow-batch"]) @@ -475,7 +472,7 @@ def _to_item(table: Any) -> Any: assert len(actual_tbl) == 5432 * 3 assert actual_tbl.schema.names == shuffled_names - pipeline.load().raise_on_failed_jobs() + pipeline.load() @pytest.mark.parametrize("item_type", ["pandas", "arrow-table", "arrow-batch"]) @@ -543,11 +540,11 @@ def test_import_file_with_arrow_schema() -> None: # columns should be created from empty table import_file = "tests/load/cases/loading/header.jsonl" - info = pipeline.run( + pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2, hints=empty_table)], table_name="no_header", ) - info.raise_on_failed_jobs() + assert_only_table_columns(pipeline, "no_header", schema.names) rows = load_tables_to_dicts(pipeline, "no_header") assert len(rows["no_header"]) == 2 diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index ea8ed4550c..81a52abade 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -27,17 +27,14 @@ from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient from tests.pipeline.utils import airtable_emojis, load_table_counts -from tests.utils import TEST_STORAGE_ROOT, test_storage +from tests.utils import TEST_STORAGE_ROOT def test_simulate_default_naming_convention_change() -> None: # checks that (future) change in the naming convention won't affect existing pipelines pipeline = dlt.pipeline("simulated_snake_case", destination="duckdb") assert pipeline.naming.name() == "snake_case" - info = pipeline.run( - airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") - ) - info.raise_on_failed_jobs() + pipeline.run(airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock")) # normalized names assert pipeline.last_trace.last_normalize_info.row_counts["_schedule"] == 3 assert "_schedule" in pipeline.default_schema.tables @@ -51,19 +48,15 @@ def test_simulate_default_naming_convention_change() -> None: print(airtable_emojis().schema.naming.name()) # run new and old pipelines - info = duck_pipeline.run( + duck_pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") ) - info.raise_on_failed_jobs() print(duck_pipeline.last_trace.last_normalize_info.row_counts) assert duck_pipeline.last_trace.last_normalize_info.row_counts["📆 Schedule"] == 3 assert "📆 Schedule" in duck_pipeline.default_schema.tables # old pipeline should keep its naming convention - info = pipeline.run( - airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") - ) - info.raise_on_failed_jobs() + pipeline.run(airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock")) # normalized names assert pipeline.last_trace.last_normalize_info.row_counts["_schedule"] == 3 assert pipeline.naming.name() == "snake_case" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index be8d274eb0..9ba933fa7f 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -29,7 +29,7 @@ DestinationTerminalException, UnknownDestinationModule, ) -from dlt.common.exceptions import PipelineStateNotAvailable, TerminalValueError +from dlt.common.exceptions import PipelineStateNotAvailable from dlt.common.pipeline import LoadInfo, PipelineContext from dlt.common.runtime.collector import LogCollector from dlt.common.schema.exceptions import TableIdentifiersFrozen @@ -52,8 +52,7 @@ from dlt.pipeline.pipeline import Pipeline from tests.common.utils import TEST_SENTRY_DSN -from tests.common.configuration.utils import environment -from tests.utils import TEST_STORAGE_ROOT, skipifnotwindows +from tests.utils import TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file from tests.pipeline.utils import ( assert_data_table_counts, @@ -65,6 +64,8 @@ many_delayed, ) +DUMMY_COMPLETE = dummy(completed_prob=1) # factory set up to complete jobs + def test_default_pipeline() -> None: p = dlt.pipeline() @@ -635,10 +636,8 @@ def with_table_hints(): def test_restore_state_on_dummy() -> None: - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) p.config.restore_from_destination = True info = p.run([1, 2, 3], table_name="dummy_table") print(info) @@ -649,7 +648,7 @@ def test_restore_state_on_dummy() -> None: # wipe out storage p._wipe_working_folder() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) assert p.first_run is True p.sync_destination() assert p.first_run is True @@ -657,10 +656,8 @@ def test_restore_state_on_dummy() -> None: def test_first_run_flag() -> None: - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) assert p.first_run is True # attach p = dlt.attach(pipeline_name=pipeline_name) @@ -668,7 +665,7 @@ def test_first_run_flag() -> None: p.extract([1, 2, 3], table_name="dummy_table") assert p.first_run is True # attach again - p = dlt.attach(pipeline_name=pipeline_name) + p = dlt.attach(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) assert p.first_run is True assert len(p.list_extracted_load_packages()) > 0 p.normalize() @@ -689,7 +686,7 @@ def test_first_run_flag() -> None: def test_has_pending_data_flag() -> None: - p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination="dummy") + p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination=DUMMY_COMPLETE) assert p.has_pending_data is False p.extract([1, 2, 3], table_name="dummy_table") assert p.has_pending_data is True @@ -702,11 +699,10 @@ def test_has_pending_data_flag() -> None: def test_sentry_tracing() -> None: import sentry_sdk - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately os.environ["RUNTIME__SENTRY_DSN"] = TEST_SENTRY_DSN pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) # def inspect_transaction(ctx): # print(ctx) @@ -803,7 +799,7 @@ def data_schema_3(): # new pipeline pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed): p.run([data_schema_1(), data_schema_2(), data_schema_3()], write_disposition="replace") @@ -815,14 +811,12 @@ def data_schema_3(): assert len(p._schema_storage.list_schemas()) == 0 assert p.default_schema_name is None - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p.run([data_schema_1(), data_schema_2()], write_disposition="replace") assert set(p.schema_names) == set(p._schema_storage.list_schemas()) def test_run_with_table_name_exceeding_path_length() -> None: pipeline_name = "pipe_" + uniq_id() - # os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p = dlt.pipeline(pipeline_name=pipeline_name) # we must fix that @@ -833,7 +827,6 @@ def test_run_with_table_name_exceeding_path_length() -> None: def test_raise_on_failed_job() -> None: os.environ["FAIL_PROB"] = "1.0" - os.environ["RAISE_ON_FAILED_JOBS"] = "true" pipeline_name = "pipe_" + uniq_id() p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") with pytest.raises(PipelineStepFailed) as py_ex: @@ -850,15 +843,17 @@ def test_raise_on_failed_job() -> None: def test_load_info_raise_on_failed_jobs() -> None: + # By default, raises terminal error on a failed job and aborts load. This pipeline does not fail os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") load_info = p.run([1, 2, 3], table_name="numbers") assert load_info.has_failed_jobs is False - load_info.raise_on_failed_jobs() + + # Test explicit raising on a failed job after the load is completed. Let pipeline fail os.environ["COMPLETED_PROB"] = "0.0" os.environ["FAIL_PROB"] = "1.0" - + os.environ["RAISE_ON_FAILED_JOBS"] = "false" load_info = p.run([1, 2, 3], table_name="numbers") assert load_info.has_failed_jobs is True with pytest.raises(DestinationHasFailedJobs) as py_ex: @@ -866,6 +861,7 @@ def test_load_info_raise_on_failed_jobs() -> None: assert py_ex.value.destination_name == "dummy" assert py_ex.value.load_id == load_info.loads_ids[0] + # Test automatic raising on a failed job which aborts the load. Let pipeline fail os.environ["RAISE_ON_FAILED_JOBS"] = "true" with pytest.raises(PipelineStepFailed) as py_ex_2: p.run([1, 2, 3], table_name="numbers") @@ -879,9 +875,8 @@ def test_load_info_raise_on_failed_jobs() -> None: def test_run_load_pending() -> None: # prepare some data and complete load with run - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) def some_data(): yield from [1, 2, 3] @@ -910,9 +905,9 @@ def source(): def test_retry_load() -> None: + os.environ["COMPLETED_PROB"] = "1.0" retry_count = 2 - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") @@ -949,7 +944,6 @@ def fail_extract(): assert py_ex.value.step == "extract" os.environ["COMPLETED_PROB"] = "0.0" - os.environ["RAISE_ON_FAILED_JOBS"] = "true" os.environ["FAIL_PROB"] = "1.0" with pytest.raises(PipelineStepFailed) as py_ex: for attempt in Retrying( @@ -999,9 +993,8 @@ def _w_local_state(): def test_changed_write_disposition() -> None: - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) @dlt.resource def resource_1(): @@ -1048,9 +1041,8 @@ def _get_shuffled_events(repeat: int = 1): @pytest.mark.parametrize("github_resource", (github_repo_events_table_meta, github_repo_events)) def test_dispatch_rows_to_tables(github_resource: DltResource): - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) info = p.run(_get_shuffled_events | github_resource) assert_load_info(info) @@ -1096,7 +1088,7 @@ def some_source(): return [static_data(), dynamic_func_data(), dynamic_mark_data(), nested_data()] source = some_source() - p = dlt.pipeline(pipeline_name=uniq_id(), destination="dummy") + p = dlt.pipeline(pipeline_name=uniq_id(), destination=DUMMY_COMPLETE) p.run(source) schema = p.default_schema @@ -1378,8 +1370,6 @@ def test_emojis_resource_names() -> None: def test_apply_hints_infer_hints() -> None: - os.environ["COMPLETED_PROB"] = "1.0" - @dlt.source def infer(): yield dlt.resource( @@ -1391,7 +1381,7 @@ def infer(): new_new_hints = {"not_null": ["timestamp"], "primary_key": ["id"]} s = infer() s.schema.merge_hints(new_new_hints) # type: ignore[arg-type] - pipeline = dlt.pipeline(pipeline_name="inf", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="inf", destination=DUMMY_COMPLETE) pipeline.run(s) # check schema table = pipeline.default_schema.get_table("table1") @@ -1440,7 +1430,7 @@ def test_invalid_data_edge_cases() -> None: def my_source(): return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) - pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed) as pip_ex: pipeline.run(my_source) assert isinstance(pip_ex.value.__context__, PipeGenInvalid) @@ -1463,7 +1453,7 @@ def res_return(): def my_source_yield(): yield dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) - pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed) as pip_ex: pipeline.run(my_source_yield) assert isinstance(pip_ex.value.__context__, PipeGenInvalid) @@ -1736,6 +1726,7 @@ def test_pipeline_list_packages() -> None: assert normalized_package.state == "normalized" assert len(normalized_package.jobs["new_jobs"]) == len(extracted_package.jobs["new_jobs"]) # load all 3 packages and fail all jobs in them + os.environ["RAISE_ON_FAILED_JOBS"] = "false" # do not raise, complete package till the end os.environ["FAIL_PROB"] = "1.0" pipeline.load() load_ids_l = pipeline.list_completed_load_packages() @@ -1748,7 +1739,7 @@ def test_pipeline_list_packages() -> None: def test_remove_pending_packages() -> None: - pipeline = dlt.pipeline(pipeline_name="emojis", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="emojis", destination=DUMMY_COMPLETE) pipeline.extract(airtable_emojis()) assert pipeline.has_pending_data pipeline.drop_pending_packages() @@ -2477,26 +2468,28 @@ def test_import_jsonl_file() -> None: loader_file_format="jsonl", columns=columns, ) - info.raise_on_failed_jobs() print(info) assert_imported_file(pipeline, "no_header", columns, 2) # use hints to infer hints = dlt.mark.make_hints(columns=columns) - info = pipeline.run( + pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2, hints=hints)], table_name="no_header_2", ) - info.raise_on_failed_jobs() assert_imported_file(pipeline, "no_header_2", columns, 2, expects_state=False) def test_import_file_without_sniff_schema() -> None: + os.environ["RAISE_ON_FAILED_JOBS"] = "false" + pipeline = dlt.pipeline( pipeline_name="test_jsonl_import", destination="duckdb", dev_mode=True, ) + + # table will not be found which is terminal exception import_file = "tests/load/cases/loading/header.jsonl" info = pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2)], diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index af3a6c239e..0a2f8f3dd9 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -34,13 +34,17 @@ class BaseModel: # type: ignore[no-redef] from dlt.extract.storage import ExtractStorage from dlt.extract.validation import PydanticValidator +from dlt.destinations import dummy + from dlt.pipeline import TCollectorArg -from tests.utils import TEST_STORAGE_ROOT, test_storage +from tests.utils import TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file from tests.load.utils import DestinationTestConfiguration, destinations_configs from tests.pipeline.utils import assert_load_info, load_data_table_counts, many_delayed +DUMMY_COMPLETE = dummy(completed_prob=1) # factory set up to complete jobs + @pytest.mark.parametrize( "destination_config", @@ -76,6 +80,8 @@ def test_create_pipeline_all_destinations(destination_config: DestinationTestCon @pytest.mark.parametrize("progress", ["tqdm", "enlighten", "log", "alive_progress"]) def test_pipeline_progress(progress: TCollectorArg) -> None: + # do not raise on failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" os.environ["TIMEOUT"] = "3.0" p = dlt.pipeline(destination="dummy", progress=progress) @@ -528,7 +534,6 @@ def jsonl_data(): assert jsonl_pq.compute_table_schema()["file_format"] == "parquet" info = dlt.pipeline("example", destination="duckdb").run([jsonl_preferred, jsonl_r, jsonl_pq]) - info.raise_on_failed_jobs() # check file types on load jobs load_jobs = { job.job_file_info.table_name: job.job_file_info @@ -542,7 +547,6 @@ def jsonl_data(): csv_r = dlt.resource(jsonl_data, file_format="csv", name="csv_r") assert csv_r.compute_table_schema()["file_format"] == "csv" info = dlt.pipeline("example", destination="duckdb").run(csv_r) - info.raise_on_failed_jobs() # fallback to preferred load_jobs = { job.job_file_info.table_name: job.job_file_info diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index d2bb035a17..9c12519a89 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -17,7 +17,7 @@ from dlt.common.pipeline import ExtractInfo, NormalizeInfo, LoadInfo from dlt.common.schema import Schema from dlt.common.runtime.telemetry import stop_telemetry -from dlt.common.typing import DictStrAny, StrStr, DictStrStr, TSecretValue +from dlt.common.typing import DictStrAny, DictStrStr, TSecretValue from dlt.common.utils import digest128 from dlt.destinations import dummy, filesystem @@ -36,7 +36,6 @@ from tests.pipeline.utils import PIPELINE_TEST_CASES_PATH from tests.utils import TEST_STORAGE_ROOT, start_test_telemetry -from tests.common.configuration.utils import toml_providers, environment def test_create_trace(toml_providers: ConfigProvidersContext, environment: Any) -> None: @@ -328,8 +327,7 @@ def data(): os.environ["API_TYPE"] = "REST" os.environ["SOURCES__MANY_HINTS__CREDENTIALS"] = "CREDS" - info = pipeline.run([many_hints(), github()]) - info.raise_on_failed_jobs() + pipeline.run([many_hints(), github()]) trace = pipeline.last_trace pipeline._schema_storage.storage.save("trace.json", json.dumps(trace, pretty=True)) @@ -338,8 +336,7 @@ def data(): trace_pipeline = dlt.pipeline( pipeline_name="test_trace_schema_traces", destination=dummy(completed_prob=1.0) ) - info = trace_pipeline.run([trace], table_name="trace", schema=schema) - info.raise_on_failed_jobs() + trace_pipeline.run([trace], table_name="trace", schema=schema) # add exception trace with pytest.raises(PipelineStepFailed): @@ -350,8 +347,7 @@ def data(): "trace_exception.json", json.dumps(trace_exception, pretty=True) ) - info = trace_pipeline.run([trace_exception], table_name="trace") - info.raise_on_failed_jobs() + trace_pipeline.run([trace_exception], table_name="trace") inferred_trace_contract = trace_pipeline.schemas["trace"] inferred_contract_str = inferred_trace_contract.to_pretty_yaml(remove_processing_hints=True) @@ -373,7 +369,7 @@ def data(): contract_trace_pipeline = dlt.pipeline( pipeline_name="test_trace_schema_traces_contract", destination=dummy(completed_prob=1.0) ) - info = contract_trace_pipeline.run( + contract_trace_pipeline.run( [trace_exception, trace], table_name="trace", schema=trace_contract, @@ -517,6 +513,8 @@ def test_trace_telemetry() -> None: SENTRY_SENT_ITEMS.clear() # make dummy fail all files os.environ["FAIL_PROB"] = "1.0" + # but do not raise exceptions + os.environ["RAISE_ON_FAILED_JOBS"] = "false" load_info = dlt.pipeline().run( [1, 2, 3], table_name="data", destination="dummy", dataset_name="data_data" ) @@ -694,7 +692,6 @@ def assert_trace_serializable(trace: PipelineTrace) -> None: from dlt.destinations import duckdb trace_pipeline = dlt.pipeline("trace", destination=duckdb(":pipeline:")).drop() - load_info = trace_pipeline.run([trace], table_name="trace_data") - load_info.raise_on_failed_jobs() + trace_pipeline.run([trace], table_name="trace_data") # print(trace_pipeline.default_schema.to_pretty_yaml()) diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index 4b46bb7c3e..bf48e347c2 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -733,7 +733,8 @@ def test_pydantic_contract_implementation(contract_setting: str, as_list: bool) from pydantic import BaseModel class Items(BaseModel): - id: int # noqa: A003 + # for variant test below we must allow allow id to be nullable + id: Optional[int] # noqa: A003 name: str def get_items(as_list: bool = False): diff --git a/tests/utils.py b/tests/utils.py index 63acb96be7..9facdfc375 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -344,7 +344,7 @@ def assert_load_info(info: LoadInfo, expected_load_packages: int = 1) -> None: assert len(info.loads_ids) == expected_load_packages # all packages loaded assert all(package.state == "loaded" for package in info.load_packages) is True - # no failed jobs in any of the packages + # Explicitly check for no failed job in any load package. In case a terminal exception was disabled by raise_on_failed_jobs=False info.raise_on_failed_jobs()