From e851b848da6cb8e47d60780376d5fe65a87d92b3 Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 9 Sep 2024 20:51:21 +0530 Subject: [PATCH] Removes redundant calls to raise_on_failed_jobs() in entire test suite. Refactors tests where necessary. --- dlt/helpers/airflow_helper.py | 4 ++- .../chess_production/chess_production.py | 2 +- .../custom_destination_bigquery.py | 3 --- .../test_parent_child_relationship.py | 6 ++--- .../docs/running-in-production/running.md | 2 +- tests/destinations/test_custom_destination.py | 1 - tests/extract/test_incremental.py | 22 +++++++--------- .../bigquery/test_bigquery_table_builder.py | 4 +-- tests/load/pipeline/test_csv_loading.py | 1 - tests/load/pipeline/test_drop.py | 4 +-- tests/load/pipeline/test_duckdb.py | 22 ++++++---------- .../load/pipeline/test_filesystem_pipeline.py | 5 ---- tests/load/pipeline/test_pipelines.py | 8 +++--- tests/load/qdrant/test_restore_state.py | 4 +-- .../sql_database/test_sql_database_source.py | 10 +++---- tests/load/test_dummy_client.py | 11 ++++++-- tests/pipeline/test_arrow_sources.py | 26 +++++++++---------- tests/pipeline/test_dlt_versions.py | 11 +++----- tests/pipeline/test_pipeline.py | 18 ++++++------- tests/pipeline/test_pipeline_extra.py | 4 +-- tests/pipeline/test_pipeline_trace.py | 17 +++++------- tests/utils.py | 2 +- 22 files changed, 79 insertions(+), 108 deletions(-) diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index 5a3b3baf38..932b52c76d 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -266,7 +266,9 @@ def _run( if self.abort_task_if_any_job_failed is not None: dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed - logger.info("Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}") + logger.info( + "Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}" + ) if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR: task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER) diff --git a/docs/examples/chess_production/chess_production.py b/docs/examples/chess_production/chess_production.py index bfbc16b39e..196bc13e18 100644 --- a/docs/examples/chess_production/chess_production.py +++ b/docs/examples/chess_production/chess_production.py @@ -167,4 +167,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(max_players=MAX_PLAYERS) - load_info = load_data_with_retry(pipeline, data) + load_data_with_retry(pipeline, data) diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index 48a16f15c0..67e5d5bb4a 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -91,6 +91,3 @@ def bigquery_insert( load_info = pipeline.run(resource(url=OWID_DISASTERS_URL)) print(load_info) - - # make sure nothing failed - load_info.raise_on_failed_jobs() diff --git a/docs/examples/parent_child_relationship/test_parent_child_relationship.py b/docs/examples/parent_child_relationship/test_parent_child_relationship.py index 95d1bade97..e86ab6f626 100644 --- a/docs/examples/parent_child_relationship/test_parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/test_parent_child_relationship.py @@ -1,3 +1,4 @@ + import pytest from tests.utils import skipifgithubfork @@ -10,9 +11,8 @@ keywords: [parent child relationship, parent key] --- -This example demonstrates handling data with parent-child relationships using -the `dlt` library. You learn how to integrate specific fields (e.g., primary, -foreign keys) from a parent record into each child record. +This example demonstrates handling data with parent-child relationships using the `dlt` library. +You learn how to integrate specific fields (e.g., primary, foreign keys) from a parent record into each child record. In this example, we'll explore how to: diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index cc17dae7bb..0c010332e7 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -262,7 +262,7 @@ def check(ex: Exception): If any job in the package **fails terminally** it will be moved to `failed_jobs` folder and assigned such status. By default, **an exceptions is raised** and on the first failed job, the load package will be aborted with `LoadClientJobFailed` (terminal exception). -Such package will be completed but its load id is not added to the `_dlt_loads` table. +Such package will be completed but its load id is not added to the `_dlt_loads` table. All the jobs that were running in parallel are completed before raising. The dlt state, if present, will not be visible to `dlt`. Here is an example `config.toml` to disable this behavior: diff --git a/tests/destinations/test_custom_destination.py b/tests/destinations/test_custom_destination.py index 5b775d3efd..30d06e8b96 100644 --- a/tests/destinations/test_custom_destination.py +++ b/tests/destinations/test_custom_destination.py @@ -507,7 +507,6 @@ def sink_func_with_spec( [1, 2, 3], table_name="items" ) - # check destination with additional config params @dlt.destination(spec=MyDestinationSpec) def sink_func_with_spec_and_additional_params( diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index a2a3fbafac..0a0de75987 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -207,8 +207,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) + p.run(some_data()) with p.sql_client() as c: with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur: @@ -248,8 +248,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) + p.run(some_data()) with p.sql_client() as c: with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur: @@ -455,7 +455,7 @@ def some_data(created_at=dlt.sources.incremental("created_at")): pipeline_name=uniq_id(), destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), ) - p.run(some_data()).raise_on_failed_jobs() + p.run(some_data()) with p.sql_client() as c: with c.execute_query( @@ -1325,12 +1325,11 @@ def some_data( ): yield from source_items - info = p.run(some_data()) - info.raise_on_failed_jobs() + p.run(some_data()) norm_info = p.last_trace.last_normalize_info assert norm_info.row_counts["some_data"] == 20 # load incrementally - info = p.run(some_data()) + p.run(some_data()) norm_info = p.last_trace.last_normalize_info assert "some_data" not in norm_info.row_counts @@ -2560,11 +2559,8 @@ def test_source(): pip_1_name = "test_pydantic_columns_validator_" + uniq_id() pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb") - info = pipeline.run(test_source()) - info.raise_on_failed_jobs() - - info = pipeline.run(test_source_incremental()) - info.raise_on_failed_jobs() + pipeline.run(test_source()) + pipeline.run(test_source_incremental()) # verify that right steps are at right place steps = test_source().table_name._pipe._steps diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 63ac645113..169a67a6da 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -38,7 +38,6 @@ drop_active_pipeline_data, TABLE_UPDATE, sequence_generator, - empty_schema, ) # mark all tests as essential, do not remove @@ -1019,8 +1018,7 @@ def sources() -> List[DltResource]: dlt.resource([{"col2": "ABC"}], name="hints"), table_description="Once upon a time a small table got hinted twice.", ) - info = pipeline.run(mod_hints) - info.raise_on_failed_jobs() + pipeline.run(mod_hints) assert pipeline.last_trace.last_normalize_info.row_counts["hints"] == 1 with pipeline.sql_client() as c: diff --git a/tests/load/pipeline/test_csv_loading.py b/tests/load/pipeline/test_csv_loading.py index 6a2be2eb40..926d28112b 100644 --- a/tests/load/pipeline/test_csv_loading.py +++ b/tests/load/pipeline/test_csv_loading.py @@ -92,7 +92,6 @@ def test_custom_csv_no_header( table_name="no_header", loader_file_format=file_format, ) - info.raise_on_failed_jobs() print(info) assert_only_table_columns(pipeline, "no_header", [col["name"] for col in columns]) rows = load_tables_to_dicts(pipeline, "no_header") diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index e1c6ec9d79..40b83862ca 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -1,5 +1,5 @@ import os -from typing import Any, Iterator, Dict, Any, List +from typing import Iterator, Dict, Any, List from unittest import mock from itertools import chain @@ -355,7 +355,7 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf attached.extract(droppable_source()) # TODO: individual steps cause pipeline.run() never raises attached.normalize() - attached.load(raise_on_failed_jobs=True) + attached.load() @pytest.mark.parametrize( diff --git a/tests/load/pipeline/test_duckdb.py b/tests/load/pipeline/test_duckdb.py index 2fa44d77c5..d0c5fe3820 100644 --- a/tests/load/pipeline/test_duckdb.py +++ b/tests/load/pipeline/test_duckdb.py @@ -30,17 +30,15 @@ def test_duck_case_names(destination_config: DestinationTestConfiguration) -> No os.environ["SCHEMA__NAMING"] = "duck_case" pipeline = destination_config.setup_pipeline("test_duck_case_names") # create tables and columns with emojis and other special characters - info = pipeline.run( + pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock"), loader_file_format=destination_config.file_format, ) - info.raise_on_failed_jobs() - info = pipeline.run( + pipeline.run( [{"🐾Feet": 2, "1+1": "two", "\nhey": "value"}], table_name="🦚Peacocks🦚", loader_file_format=destination_config.file_format, ) - info.raise_on_failed_jobs() table_counts = load_table_counts( pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()] ) @@ -103,13 +101,12 @@ def test_duck_precision_types(destination_config: DestinationTestConfiguration) "col5_int": 2**64 // 2 - 1, } ] - info = pipeline.run( + pipeline.run( row, table_name="row", loader_file_format=destination_config.file_format, columns=TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS + TABLE_UPDATE_ALL_INT_PRECISIONS, ) - info.raise_on_failed_jobs() with pipeline.sql_client() as client: table = client.native_connection.sql("SELECT * FROM row").arrow() @@ -163,14 +160,13 @@ class EventV1(BaseModel): event = {"ver": 1, "id": "id1", "details": {"detail_id": "detail_1", "is_complete": False}} - info = pipeline.run( + pipeline.run( [event], table_name="events", columns=EventV1, loader_file_format="parquet", schema_contract="evolve", ) - info.raise_on_failed_jobs() print(pipeline.default_schema.to_pretty_yaml()) # we will use a different pipeline with a separate schema but writing to the same dataset and to the same table @@ -196,14 +192,13 @@ class EventV2(BaseModel): "test_new_nested_prop_parquet_2", dataset_name="test_dataset" ) pipeline.destination = duck_factory # type: ignore - info = pipeline.run( + pipeline.run( [event], table_name="events", columns=EventV2, loader_file_format="parquet", schema_contract="evolve", ) - info.raise_on_failed_jobs() print(pipeline.default_schema.to_pretty_yaml()) @@ -216,8 +211,7 @@ def test_jsonl_reader(destination_config: DestinationTestConfiguration) -> None: pipeline = destination_config.setup_pipeline("test_jsonl_reader") data = [{"a": 1, "b": 2}, {"a": 1}] - info = pipeline.run(data, table_name="data", loader_file_format="jsonl") - info.raise_on_failed_jobs() + pipeline.run(data, table_name="data", loader_file_format="jsonl") @pytest.mark.parametrize( @@ -242,8 +236,8 @@ def _get_shuffled_events(repeat: int = 1): pipeline = destination_config.setup_pipeline("test_provoke_parallel_parquet_same_table") - info = pipeline.run(_get_shuffled_events(50)) - info.raise_on_failed_jobs() + pipeline.run(_get_shuffled_events(50)) + assert_data_table_counts( pipeline, expected_counts={ diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index bc6cbd9848..bddd41c632 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -44,7 +44,6 @@ def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None: """Run pipeline twice with merge write disposition Regardless wether primary key is set or not, filesystem appends """ - import pyarrow.parquet as pq # Module is evaluated by other tests os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" @@ -102,7 +101,6 @@ def test_pipeline_csv_filesystem_destination(item_type: TestDataItemFormat) -> N item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -128,7 +126,6 @@ def test_csv_options(item_type: TestDataItemFormat) -> None: item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -157,7 +154,6 @@ def test_csv_quoting_style(item_type: TestDataItemFormat) -> None: item, _, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True) info = pipeline.run(item, table_name="table", loader_file_format="csv") - info.raise_on_failed_jobs() job = info.load_packages[0].jobs["completed_jobs"][0].file_path assert job.endswith("csv") with open(job, "r", encoding="utf-8", newline="") as f: @@ -693,7 +689,6 @@ def test_delta_table_empty_source( Tests both empty Arrow table and `dlt.mark.materialize_table_schema()`. """ - from dlt.common.libs.pyarrow import pyarrow as pa from dlt.common.libs.deltalake import ensure_delta_compatible_arrow_data, get_delta_tables from tests.pipeline.utils import users_materialize_table_schema diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 2792cec085..4b250959ad 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -9,7 +9,6 @@ from dlt.common import json, sleep from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination -from dlt.common.destination.exceptions import DestinationHasFailedJobs from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema @@ -24,6 +23,7 @@ from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.extract.exceptions import ResourceNameMissing from dlt.extract.source import DltSource +from dlt.load.exceptions import LoadClientJobFailed from dlt.pipeline.exceptions import ( CannotRestorePipelineException, PipelineConfigMissing, @@ -767,9 +767,9 @@ def test_snowflake_custom_stage(destination_config: DestinationTestConfiguration """Using custom stage name instead of the table stage""" os.environ["DESTINATION__SNOWFLAKE__STAGE_NAME"] = "my_non_existing_stage" pipeline, data = simple_nested_pipeline(destination_config, f"custom_stage_{uniq_id()}", False) - info = pipeline.run(data(), loader_file_format=destination_config.file_format) - with pytest.raises(DestinationHasFailedJobs) as f_jobs: - info.raise_on_failed_jobs() + with pytest.raises(LoadClientJobFailed) as f_jobs: + pipeline.run(data(), loader_file_format=destination_config.file_format) + assert "MY_NON_EXISTING_STAGE" in f_jobs.value.failed_jobs[0].failed_message drop_active_pipeline_data() diff --git a/tests/load/qdrant/test_restore_state.py b/tests/load/qdrant/test_restore_state.py index 31bc725d24..63f575f6ec 100644 --- a/tests/load/qdrant/test_restore_state.py +++ b/tests/load/qdrant/test_restore_state.py @@ -1,11 +1,9 @@ -from typing import TYPE_CHECKING import pytest from qdrant_client import models import dlt from tests.load.utils import destinations_configs, DestinationTestConfiguration -from dlt.common.destination.reference import JobClientBase, WithStateSync from dlt.destinations.impl.qdrant.qdrant_job_client import QdrantClient @@ -37,7 +35,7 @@ def dummy_table(): pipeline.extract(dummy_table) pipeline.normalize() - info = pipeline.load(raise_on_failed_jobs=True) + info = pipeline.load() client: QdrantClient with pipeline.destination_client() as client: # type: ignore[assignment] diff --git a/tests/load/sources/sql_database/test_sql_database_source.py b/tests/load/sources/sql_database/test_sql_database_source.py index 58382877ee..f11f73ac45 100644 --- a/tests/load/sources/sql_database/test_sql_database_source.py +++ b/tests/load/sources/sql_database/test_sql_database_source.py @@ -1,7 +1,5 @@ import os -import re from copy import deepcopy -from datetime import datetime # noqa: I251 from typing import Any, Callable, cast, List, Optional, Set import pytest @@ -505,7 +503,7 @@ def test_all_types_no_precision_hints( source.resources[table_name].add_map(unwrap_json_connector_x("json_col")) pipeline.extract(source) pipeline.normalize(loader_file_format="parquet") - pipeline.load().raise_on_failed_jobs() + pipeline.load() schema = pipeline.default_schema # print(pipeline.default_schema.to_pretty_yaml()) @@ -605,7 +603,7 @@ def test_deferred_reflect_in_source( pipeline.extract(source) # use insert values to convert parquet into INSERT pipeline.normalize(loader_file_format="insert_values") - pipeline.load().raise_on_failed_jobs() + pipeline.load() precision_table = pipeline.default_schema.get_table("has_precision") assert_precision_columns( precision_table["columns"], @@ -661,7 +659,7 @@ def test_deferred_reflect_in_resource( pipeline.extract(table) # use insert values to convert parquet into INSERT pipeline.normalize(loader_file_format="insert_values") - pipeline.load().raise_on_failed_jobs() + pipeline.load() precision_table = pipeline.default_schema.get_table("has_precision") assert_precision_columns( precision_table["columns"], @@ -954,7 +952,7 @@ def dummy_source(): pipeline.extract(source) pipeline.normalize() - pipeline.load().raise_on_failed_jobs() + pipeline.load() channel_rows = load_tables_to_dicts(pipeline, "chat_channel")["chat_channel"] assert channel_rows and all(row["active"] for row in channel_rows) diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 1ae440c2f2..7f0af692c6 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -227,8 +227,15 @@ def test_spool_job_failed() -> None: assert len(started_files) == 0 # test the whole - loader_config = LoaderConfiguration(raise_on_failed_jobs = False, workers=1, pool_type="none") - load = setup_loader(client_config=DummyClientConfiguration(fail_prob=1.0), loader_config=loader_config) + loader_config = LoaderConfiguration( + raise_on_failed_jobs=False, + workers=1, + pool_type="none", + ) + load = setup_loader( + client_config=DummyClientConfiguration(fail_prob=1.0), + loader_config=loader_config, + ) load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) run_all(load) diff --git a/tests/pipeline/test_arrow_sources.py b/tests/pipeline/test_arrow_sources.py index 4cdccb1e34..13640cc0fb 100644 --- a/tests/pipeline/test_arrow_sources.py +++ b/tests/pipeline/test_arrow_sources.py @@ -2,8 +2,6 @@ from typing import Any import pytest import pandas as pd -import os -import io import pyarrow as pa import dlt @@ -236,7 +234,7 @@ def test_load_arrow_vary_schema(item_type: TPythonTableFormat) -> None: pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb") item, _, _ = arrow_table_all_data_types(item_type, include_not_normalized_name=False) - pipeline.run(item, table_name="data").raise_on_failed_jobs() + pipeline.run(item, table_name="data") item, _, _ = arrow_table_all_data_types(item_type, include_not_normalized_name=False) # remove int column @@ -246,7 +244,7 @@ def test_load_arrow_vary_schema(item_type: TPythonTableFormat) -> None: names = item.schema.names names.remove("int") item = item.select(names) - pipeline.run(item, table_name="data").raise_on_failed_jobs() + pipeline.run(item, table_name="data") @pytest.mark.parametrize("item_type", ["pandas", "arrow-table", "arrow-batch"]) @@ -310,10 +308,10 @@ def some_data(): assert schema.tables["some_data"]["columns"]["_dlt_id"]["data_type"] == "text" assert schema.tables["some_data"]["columns"]["_dlt_load_id"]["data_type"] == "text" - pipeline.load().raise_on_failed_jobs() + pipeline.load() # should be able to load again - pipeline.run(some_data()).raise_on_failed_jobs() + pipeline.run(some_data()) # should be able to load arrow without a column try: @@ -322,12 +320,12 @@ def some_data(): names = item.schema.names names.remove("int") item = item.select(names) - pipeline.run(item, table_name="some_data").raise_on_failed_jobs() + pipeline.run(item, table_name="some_data") # should be able to load arrow with a new column item, records, _ = arrow_table_all_data_types(item_type, num_rows=200) item = item.append_column("static_int", [[0] * 200]) - pipeline.run(item, table_name="some_data").raise_on_failed_jobs() + pipeline.run(item, table_name="some_data") schema = pipeline.default_schema assert schema.tables["some_data"]["columns"]["static_int"]["data_type"] == "bigint" @@ -380,8 +378,8 @@ def _to_item(table: Any) -> Any: assert normalize_info.row_counts["table"] == 5432 * 3 # load to duckdb - load_info = pipeline.load() - load_info.raise_on_failed_jobs() + pipeline.load() + @pytest.mark.parametrize("item_type", ["arrow-table", "pandas", "arrow-batch"]) @@ -423,7 +421,7 @@ def _to_item(table: Any) -> Any: shuffled_names.append("binary") assert actual_tbl.schema.names == shuffled_names - pipeline.load().raise_on_failed_jobs() + pipeline.load() @pytest.mark.parametrize("item_type", ["arrow-table", "pandas", "arrow-batch"]) @@ -475,7 +473,7 @@ def _to_item(table: Any) -> Any: assert len(actual_tbl) == 5432 * 3 assert actual_tbl.schema.names == shuffled_names - pipeline.load().raise_on_failed_jobs() + pipeline.load() @pytest.mark.parametrize("item_type", ["pandas", "arrow-table", "arrow-batch"]) @@ -543,11 +541,11 @@ def test_import_file_with_arrow_schema() -> None: # columns should be created from empty table import_file = "tests/load/cases/loading/header.jsonl" - info = pipeline.run( + pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2, hints=empty_table)], table_name="no_header", ) - info.raise_on_failed_jobs() + assert_only_table_columns(pipeline, "no_header", schema.names) rows = load_tables_to_dicts(pipeline, "no_header") assert len(rows["no_header"]) == 2 diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 319055184a..620b8c05e8 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -27,17 +27,16 @@ from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient from tests.pipeline.utils import airtable_emojis, load_table_counts -from tests.utils import TEST_STORAGE_ROOT, test_storage +from tests.utils import TEST_STORAGE_ROOT def test_simulate_default_naming_convention_change() -> None: # checks that (future) change in the naming convention won't affect existing pipelines pipeline = dlt.pipeline("simulated_snake_case", destination="duckdb") assert pipeline.naming.name() == "snake_case" - info = pipeline.run( + pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") ) - info.raise_on_failed_jobs() # normalized names assert pipeline.last_trace.last_normalize_info.row_counts["_schedule"] == 3 assert "_schedule" in pipeline.default_schema.tables @@ -51,19 +50,17 @@ def test_simulate_default_naming_convention_change() -> None: print(airtable_emojis().schema.naming.name()) # run new and old pipelines - info = duck_pipeline.run( + duck_pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") ) - info.raise_on_failed_jobs() print(duck_pipeline.last_trace.last_normalize_info.row_counts) assert duck_pipeline.last_trace.last_normalize_info.row_counts["📆 Schedule"] == 3 assert "📆 Schedule" in duck_pipeline.default_schema.tables # old pipeline should keep its naming convention - info = pipeline.run( + pipeline.run( airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock") ) - info.raise_on_failed_jobs() # normalized names assert pipeline.last_trace.last_normalize_info.row_counts["_schedule"] == 3 assert pipeline.naming.name() == "snake_case" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 535d5d28e4..94e8c223a0 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -29,7 +29,7 @@ DestinationTerminalException, UnknownDestinationModule, ) -from dlt.common.exceptions import PipelineStateNotAvailable, TerminalValueError +from dlt.common.exceptions import PipelineStateNotAvailable from dlt.common.pipeline import LoadInfo, PipelineContext from dlt.common.runtime.collector import LogCollector from dlt.common.schema.exceptions import TableIdentifiersFrozen @@ -52,8 +52,7 @@ from dlt.pipeline.pipeline import Pipeline from tests.common.utils import TEST_SENTRY_DSN -from tests.common.configuration.utils import environment -from tests.utils import TEST_STORAGE_ROOT, skipifnotwindows +from tests.utils import TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file from tests.pipeline.utils import ( assert_data_table_counts, @@ -833,7 +832,7 @@ def test_run_with_table_name_exceeding_path_length() -> None: def test_raise_on_failed_job() -> None: os.environ["FAIL_PROB"] = "1.0" - os.environ["RAISE_ON_FAILED_JOBS"] = "true" + os.environ["RAISE_ON_FAILED_JOBS"] = "true" # TODO: why is this necessary? pipeline_name = "pipe_" + uniq_id() p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") with pytest.raises(PipelineStepFailed) as py_ex: @@ -850,15 +849,17 @@ def test_raise_on_failed_job() -> None: def test_load_info_raise_on_failed_jobs() -> None: + # By default, raises terminal error on a failed job and aborts load. This pipeline does not fail os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") load_info = p.run([1, 2, 3], table_name="numbers") assert load_info.has_failed_jobs is False - load_info.raise_on_failed_jobs() + + # Test explicit raising on a failed job after the load is completed. Let pipeline fail os.environ["COMPLETED_PROB"] = "0.0" os.environ["FAIL_PROB"] = "1.0" - + os.environ["RAISE_ON_FAILED_JOBS"] = "false" load_info = p.run([1, 2, 3], table_name="numbers") assert load_info.has_failed_jobs is True with pytest.raises(DestinationHasFailedJobs) as py_ex: @@ -866,6 +867,7 @@ def test_load_info_raise_on_failed_jobs() -> None: assert py_ex.value.destination_name == "dummy" assert py_ex.value.load_id == load_info.loads_ids[0] + # Test automatic raising on a failed job which aborts the load. Let pipeline fail os.environ["RAISE_ON_FAILED_JOBS"] = "true" with pytest.raises(PipelineStepFailed) as py_ex_2: p.run([1, 2, 3], table_name="numbers") @@ -2477,17 +2479,15 @@ def test_import_jsonl_file() -> None: loader_file_format="jsonl", columns=columns, ) - info.raise_on_failed_jobs() print(info) assert_imported_file(pipeline, "no_header", columns, 2) # use hints to infer hints = dlt.mark.make_hints(columns=columns) - info = pipeline.run( + pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2, hints=hints)], table_name="no_header_2", ) - info.raise_on_failed_jobs() assert_imported_file(pipeline, "no_header_2", columns, 2, expects_state=False) diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index af3a6c239e..600baff1a0 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -36,7 +36,7 @@ class BaseModel: # type: ignore[no-redef] from dlt.pipeline import TCollectorArg -from tests.utils import TEST_STORAGE_ROOT, test_storage +from tests.utils import TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file from tests.load.utils import DestinationTestConfiguration, destinations_configs from tests.pipeline.utils import assert_load_info, load_data_table_counts, many_delayed @@ -528,7 +528,6 @@ def jsonl_data(): assert jsonl_pq.compute_table_schema()["file_format"] == "parquet" info = dlt.pipeline("example", destination="duckdb").run([jsonl_preferred, jsonl_r, jsonl_pq]) - info.raise_on_failed_jobs() # check file types on load jobs load_jobs = { job.job_file_info.table_name: job.job_file_info @@ -542,7 +541,6 @@ def jsonl_data(): csv_r = dlt.resource(jsonl_data, file_format="csv", name="csv_r") assert csv_r.compute_table_schema()["file_format"] == "csv" info = dlt.pipeline("example", destination="duckdb").run(csv_r) - info.raise_on_failed_jobs() # fallback to preferred load_jobs = { job.job_file_info.table_name: job.job_file_info diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index d2bb035a17..4178b9e632 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -17,7 +17,7 @@ from dlt.common.pipeline import ExtractInfo, NormalizeInfo, LoadInfo from dlt.common.schema import Schema from dlt.common.runtime.telemetry import stop_telemetry -from dlt.common.typing import DictStrAny, StrStr, DictStrStr, TSecretValue +from dlt.common.typing import DictStrAny, DictStrStr, TSecretValue from dlt.common.utils import digest128 from dlt.destinations import dummy, filesystem @@ -36,7 +36,6 @@ from tests.pipeline.utils import PIPELINE_TEST_CASES_PATH from tests.utils import TEST_STORAGE_ROOT, start_test_telemetry -from tests.common.configuration.utils import toml_providers, environment def test_create_trace(toml_providers: ConfigProvidersContext, environment: Any) -> None: @@ -328,8 +327,7 @@ def data(): os.environ["API_TYPE"] = "REST" os.environ["SOURCES__MANY_HINTS__CREDENTIALS"] = "CREDS" - info = pipeline.run([many_hints(), github()]) - info.raise_on_failed_jobs() + pipeline.run([many_hints(), github()]) trace = pipeline.last_trace pipeline._schema_storage.storage.save("trace.json", json.dumps(trace, pretty=True)) @@ -338,8 +336,7 @@ def data(): trace_pipeline = dlt.pipeline( pipeline_name="test_trace_schema_traces", destination=dummy(completed_prob=1.0) ) - info = trace_pipeline.run([trace], table_name="trace", schema=schema) - info.raise_on_failed_jobs() + trace_pipeline.run([trace], table_name="trace", schema=schema) # add exception trace with pytest.raises(PipelineStepFailed): @@ -350,8 +347,7 @@ def data(): "trace_exception.json", json.dumps(trace_exception, pretty=True) ) - info = trace_pipeline.run([trace_exception], table_name="trace") - info.raise_on_failed_jobs() + trace_pipeline.run([trace_exception], table_name="trace") inferred_trace_contract = trace_pipeline.schemas["trace"] inferred_contract_str = inferred_trace_contract.to_pretty_yaml(remove_processing_hints=True) @@ -373,7 +369,7 @@ def data(): contract_trace_pipeline = dlt.pipeline( pipeline_name="test_trace_schema_traces_contract", destination=dummy(completed_prob=1.0) ) - info = contract_trace_pipeline.run( + contract_trace_pipeline.run( [trace_exception, trace], table_name="trace", schema=trace_contract, @@ -694,7 +690,6 @@ def assert_trace_serializable(trace: PipelineTrace) -> None: from dlt.destinations import duckdb trace_pipeline = dlt.pipeline("trace", destination=duckdb(":pipeline:")).drop() - load_info = trace_pipeline.run([trace], table_name="trace_data") - load_info.raise_on_failed_jobs() + trace_pipeline.run([trace], table_name="trace_data") # print(trace_pipeline.default_schema.to_pretty_yaml()) diff --git a/tests/utils.py b/tests/utils.py index 63acb96be7..ee31914197 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -344,7 +344,7 @@ def assert_load_info(info: LoadInfo, expected_load_packages: int = 1) -> None: assert len(info.loads_ids) == expected_load_packages # all packages loaded assert all(package.state == "loaded" for package in info.load_packages) is True - # no failed jobs in any of the packages + # Explicitely check for no failed job in any load package. In case a terminal exception was disabled by raise_on_failed_jobs=False info.raise_on_failed_jobs()