From aa66a49cd54f7ba9f1a50218f655e55cfb6a846e Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 9 Sep 2024 23:34:42 +0200 Subject: [PATCH] fixes some test cases that started to abort --- .../snippets/destination-snippets.py | 1 + .../performance-snippets.py | 2 + .../test_bigquery_streaming_insert.py | 12 +++-- tests/load/conftest.py | 8 ++- tests/load/pipeline/test_csv_loading.py | 2 + .../load/pipeline/test_databricks_pipeline.py | 3 ++ tests/load/pipeline/test_pipelines.py | 2 + tests/load/pipeline/test_redshift.py | 17 +++--- tests/normalize/test_max_nesting.py | 4 +- tests/pipeline/conftest.py | 2 + tests/pipeline/test_pipeline.py | 52 ++++++++----------- tests/pipeline/test_pipeline_extra.py | 6 +++ tests/pipeline/test_pipeline_trace.py | 2 + tests/pipeline/test_schema_contracts.py | 3 +- 14 files changed, 72 insertions(+), 44 deletions(-) diff --git a/docs/website/docs/general-usage/snippets/destination-snippets.py b/docs/website/docs/general-usage/snippets/destination-snippets.py index d27eb4f0ae..c1c0f745c5 100644 --- a/docs/website/docs/general-usage/snippets/destination-snippets.py +++ b/docs/website/docs/general-usage/snippets/destination-snippets.py @@ -94,6 +94,7 @@ def destination_instantiation_snippet() -> None: # here dependencies dependencies will be imported, secrets pulled and destination accessed # we pass bucket_url explicitly and expect credentials passed by config provider load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url)) + print(load_info) # @@@DLT_SNIPPET_END late_destination_access diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index c7c26d9e7b..33c29eb681 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -180,6 +180,8 @@ async def _run_async(): loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table), ) # results contains two LoadInfo instances + print("pipeline_1", results[0]) + print("pipeline_2", results[1]) # load data asyncio.run(_run_async()) diff --git a/tests/load/bigquery/test_bigquery_streaming_insert.py b/tests/load/bigquery/test_bigquery_streaming_insert.py index c950a46f91..20d07c7c76 100644 --- a/tests/load/bigquery/test_bigquery_streaming_insert.py +++ b/tests/load/bigquery/test_bigquery_streaming_insert.py @@ -1,7 +1,10 @@ import pytest import dlt +from dlt.common.pipeline import LoadInfo from dlt.destinations.adapters import bigquery_adapter +from dlt.load.exceptions import LoadClientJobFailed +from dlt.pipeline.exceptions import PipelineStepFailed from tests.pipeline.utils import assert_load_info @@ -40,13 +43,16 @@ def test_resource(): test_resource.apply_hints(additional_table_hints={"x-insert-api": "streaming"}) pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery") - info = pipe.run(test_resource) + with pytest.raises(PipelineStepFailed) as pip_ex: + pipe.run(test_resource) + assert isinstance(pip_ex.value.step_info, LoadInfo) + assert pip_ex.value.step_info.has_failed_jobs # pick the failed job - failed_job = info.load_packages[0].jobs["failed_jobs"][0] + assert isinstance(pip_ex.value.__cause__, LoadClientJobFailed) assert ( """BigQuery streaming insert can only be used with `append`""" """ write_disposition, while the given resource has `merge`.""" - ) in failed_job.failed_message + ) in pip_ex.value.__cause__.failed_message def test_bigquery_streaming_nested_data(): diff --git a/tests/load/conftest.py b/tests/load/conftest.py index a110b1198f..76a7248e5b 100644 --- a/tests/load/conftest.py +++ b/tests/load/conftest.py @@ -2,7 +2,13 @@ import pytest from typing import Iterator -from tests.load.utils import ALL_BUCKETS, DEFAULT_BUCKETS, WITH_GDRIVE_BUCKETS, drop_pipeline +from tests.load.utils import ( + ALL_BUCKETS, + DEFAULT_BUCKETS, + WITH_GDRIVE_BUCKETS, + drop_pipeline, + empty_schema, +) from tests.utils import preserve_environ, patch_home_dir diff --git a/tests/load/pipeline/test_csv_loading.py b/tests/load/pipeline/test_csv_loading.py index 926d28112b..a2cc786915 100644 --- a/tests/load/pipeline/test_csv_loading.py +++ b/tests/load/pipeline/test_csv_loading.py @@ -113,6 +113,8 @@ def test_custom_csv_no_header( ids=lambda x: x.name, ) def test_custom_wrong_header(destination_config: DestinationTestConfiguration) -> None: + # do not raise on failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" csv_format = CsvFormatConfiguration(delimiter="|", include_header=True) # apply to collected config pipeline = destination_config.setup_pipeline("postgres_" + uniq_id(), dev_mode=True) diff --git a/tests/load/pipeline/test_databricks_pipeline.py b/tests/load/pipeline/test_databricks_pipeline.py index 73868f4e97..9d152bb099 100644 --- a/tests/load/pipeline/test_databricks_pipeline.py +++ b/tests/load/pipeline/test_databricks_pipeline.py @@ -20,6 +20,9 @@ def test_databricks_external_location(destination_config: DestinationTestConfiguration) -> None: # do not interfere with state os.environ["RESTORE_FROM_DESTINATION"] = "False" + # let the package complete even with failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" + dataset_name = "test_databricks_external_location" + uniq_id() from dlt.destinations import databricks, filesystem diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index b087adac7e..73bcbed035 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -381,6 +381,8 @@ def extended_rows(): # lets violate unique constraint on postgres, redshift and BQ ignore unique indexes if destination_config.destination == "postgres": + # let it complete even with PK violation (which is a teminal error) + os.environ["RAISE_ON_FAILED_JOBS"] = "false" assert p.dataset_name == dataset_name err_info = p.run( source(1).with_resources("simple_rows"), diff --git a/tests/load/pipeline/test_redshift.py b/tests/load/pipeline/test_redshift.py index 7c26ac17a9..ca98a653f9 100644 --- a/tests/load/pipeline/test_redshift.py +++ b/tests/load/pipeline/test_redshift.py @@ -3,7 +3,9 @@ import pytest import dlt +from dlt.common.destination.exceptions import UnsupportedDataType from dlt.common.utils import uniq_id +from dlt.pipeline.exceptions import PipelineStepFailed from tests.load.utils import destinations_configs, DestinationTestConfiguration from tests.cases import table_update_and_row, assert_all_data_types_row from tests.pipeline.utils import assert_load_info @@ -32,11 +34,10 @@ def my_resource() -> Iterator[Any]: def my_source() -> Any: return my_resource - info = pipeline.run(my_source(), **destination_config.run_kwargs) - - assert info.has_failed_jobs - - assert ( - "Redshift cannot load TIME columns from" - in info.load_packages[0].jobs["failed_jobs"][0].failed_message - ) + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(my_source(), **destination_config.run_kwargs) + assert isinstance(pip_ex.value.__cause__, UnsupportedDataType) + if destination_config.file_format == "parquet": + assert pip_ex.value.__cause__.data_type == "time" + else: + assert pip_ex.value.__cause__.data_type in ("time", "binary") diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index ec44e1c4db..fb2b2d70f6 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -61,7 +61,7 @@ def bot_events(): pipeline_name = f"test_max_table_nesting_{nesting_level}_{expected_num_tables}" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination=dummy(timeout=0.1), + destination=dummy(timeout=0.1, completed_prob=1), dev_mode=True, ) @@ -168,7 +168,7 @@ def some_data(): pipeline_name = "test_different_table_nesting_levels" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination=dummy(timeout=0.1), + destination=dummy(timeout=0.1, completed_prob=1), dev_mode=True, ) diff --git a/tests/pipeline/conftest.py b/tests/pipeline/conftest.py index f6c47e35b1..2411999dac 100644 --- a/tests/pipeline/conftest.py +++ b/tests/pipeline/conftest.py @@ -4,5 +4,7 @@ patch_home_dir, wipe_pipeline, duckdb_pipeline_location, + test_storage, ) +from tests.common.configuration.utils import environment, toml_providers from tests.pipeline.utils import drop_dataset_from_env diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 45b52406e9..5d85a450d8 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -64,6 +64,8 @@ many_delayed, ) +DUMMY_COMPLETE = dummy(completed_prob=1) # factory set up to complete jobs + def test_default_pipeline() -> None: p = dlt.pipeline() @@ -634,10 +636,8 @@ def with_table_hints(): def test_restore_state_on_dummy() -> None: - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) p.config.restore_from_destination = True info = p.run([1, 2, 3], table_name="dummy_table") print(info) @@ -648,7 +648,7 @@ def test_restore_state_on_dummy() -> None: # wipe out storage p._wipe_working_folder() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) assert p.first_run is True p.sync_destination() assert p.first_run is True @@ -656,10 +656,8 @@ def test_restore_state_on_dummy() -> None: def test_first_run_flag() -> None: - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) assert p.first_run is True # attach p = dlt.attach(pipeline_name=pipeline_name) @@ -688,7 +686,7 @@ def test_first_run_flag() -> None: def test_has_pending_data_flag() -> None: - p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination="dummy") + p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination=DUMMY_COMPLETE) assert p.has_pending_data is False p.extract([1, 2, 3], table_name="dummy_table") assert p.has_pending_data is True @@ -701,11 +699,10 @@ def test_has_pending_data_flag() -> None: def test_sentry_tracing() -> None: import sentry_sdk - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately os.environ["RUNTIME__SENTRY_DSN"] = TEST_SENTRY_DSN pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) # def inspect_transaction(ctx): # print(ctx) @@ -802,7 +799,7 @@ def data_schema_3(): # new pipeline pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed): p.run([data_schema_1(), data_schema_2(), data_schema_3()], write_disposition="replace") @@ -814,14 +811,12 @@ def data_schema_3(): assert len(p._schema_storage.list_schemas()) == 0 assert p.default_schema_name is None - os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p.run([data_schema_1(), data_schema_2()], write_disposition="replace") assert set(p.schema_names) == set(p._schema_storage.list_schemas()) def test_run_with_table_name_exceeding_path_length() -> None: pipeline_name = "pipe_" + uniq_id() - # os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p = dlt.pipeline(pipeline_name=pipeline_name) # we must fix that @@ -833,7 +828,7 @@ def test_run_with_table_name_exceeding_path_length() -> None: def test_raise_on_failed_job() -> None: os.environ["FAIL_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed) as py_ex: p.run([1, 2, 3], table_name="numbers") assert py_ex.value.step == "load" @@ -880,9 +875,8 @@ def test_load_info_raise_on_failed_jobs() -> None: def test_run_load_pending() -> None: # prepare some data and complete load with run - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) def some_data(): yield from [1, 2, 3] @@ -913,9 +907,8 @@ def source(): def test_retry_load() -> None: retry_count = 2 - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) @dlt.resource def fail_extract(): @@ -999,9 +992,8 @@ def _w_local_state(): def test_changed_write_disposition() -> None: - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) @dlt.resource def resource_1(): @@ -1048,9 +1040,8 @@ def _get_shuffled_events(repeat: int = 1): @pytest.mark.parametrize("github_resource", (github_repo_events_table_meta, github_repo_events)) def test_dispatch_rows_to_tables(github_resource: DltResource): - os.environ["COMPLETED_PROB"] = "1.0" pipeline_name = "pipe_" + uniq_id() - p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE) info = p.run(_get_shuffled_events | github_resource) assert_load_info(info) @@ -1096,7 +1087,7 @@ def some_source(): return [static_data(), dynamic_func_data(), dynamic_mark_data(), nested_data()] source = some_source() - p = dlt.pipeline(pipeline_name=uniq_id(), destination="dummy") + p = dlt.pipeline(pipeline_name=uniq_id(), destination=DUMMY_COMPLETE) p.run(source) schema = p.default_schema @@ -1378,8 +1369,6 @@ def test_emojis_resource_names() -> None: def test_apply_hints_infer_hints() -> None: - os.environ["COMPLETED_PROB"] = "1.0" - @dlt.source def infer(): yield dlt.resource( @@ -1391,7 +1380,7 @@ def infer(): new_new_hints = {"not_null": ["timestamp"], "primary_key": ["id"]} s = infer() s.schema.merge_hints(new_new_hints) # type: ignore[arg-type] - pipeline = dlt.pipeline(pipeline_name="inf", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="inf", destination=DUMMY_COMPLETE) pipeline.run(s) # check schema table = pipeline.default_schema.get_table("table1") @@ -1440,7 +1429,7 @@ def test_invalid_data_edge_cases() -> None: def my_source(): return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) - pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed) as pip_ex: pipeline.run(my_source) assert isinstance(pip_ex.value.__context__, PipeGenInvalid) @@ -1463,7 +1452,7 @@ def res_return(): def my_source_yield(): yield dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) - pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE) with pytest.raises(PipelineStepFailed) as pip_ex: pipeline.run(my_source_yield) assert isinstance(pip_ex.value.__context__, PipeGenInvalid) @@ -1736,6 +1725,7 @@ def test_pipeline_list_packages() -> None: assert normalized_package.state == "normalized" assert len(normalized_package.jobs["new_jobs"]) == len(extracted_package.jobs["new_jobs"]) # load all 3 packages and fail all jobs in them + os.environ["RAISE_ON_FAILED_JOBS"] = "false" # do not raise, complete package till the end os.environ["FAIL_PROB"] = "1.0" pipeline.load() load_ids_l = pipeline.list_completed_load_packages() @@ -1748,7 +1738,7 @@ def test_pipeline_list_packages() -> None: def test_remove_pending_packages() -> None: - pipeline = dlt.pipeline(pipeline_name="emojis", destination="dummy") + pipeline = dlt.pipeline(pipeline_name="emojis", destination=DUMMY_COMPLETE) pipeline.extract(airtable_emojis()) assert pipeline.has_pending_data pipeline.drop_pending_packages() @@ -2490,11 +2480,15 @@ def test_import_jsonl_file() -> None: def test_import_file_without_sniff_schema() -> None: + os.environ["RAISE_ON_FAILED_JOBS"] = "false" + pipeline = dlt.pipeline( pipeline_name="test_jsonl_import", destination="duckdb", dev_mode=True, ) + + # table will not be found which is terminal exception import_file = "tests/load/cases/loading/header.jsonl" info = pipeline.run( [dlt.mark.with_file_import(import_file, "jsonl", 2)], diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index 600baff1a0..0a2f8f3dd9 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -34,6 +34,8 @@ class BaseModel: # type: ignore[no-redef] from dlt.extract.storage import ExtractStorage from dlt.extract.validation import PydanticValidator +from dlt.destinations import dummy + from dlt.pipeline import TCollectorArg from tests.utils import TEST_STORAGE_ROOT @@ -41,6 +43,8 @@ class BaseModel: # type: ignore[no-redef] from tests.load.utils import DestinationTestConfiguration, destinations_configs from tests.pipeline.utils import assert_load_info, load_data_table_counts, many_delayed +DUMMY_COMPLETE = dummy(completed_prob=1) # factory set up to complete jobs + @pytest.mark.parametrize( "destination_config", @@ -76,6 +80,8 @@ def test_create_pipeline_all_destinations(destination_config: DestinationTestCon @pytest.mark.parametrize("progress", ["tqdm", "enlighten", "log", "alive_progress"]) def test_pipeline_progress(progress: TCollectorArg) -> None: + # do not raise on failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" os.environ["TIMEOUT"] = "3.0" p = dlt.pipeline(destination="dummy", progress=progress) diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 4178b9e632..9c12519a89 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -513,6 +513,8 @@ def test_trace_telemetry() -> None: SENTRY_SENT_ITEMS.clear() # make dummy fail all files os.environ["FAIL_PROB"] = "1.0" + # but do not raise exceptions + os.environ["RAISE_ON_FAILED_JOBS"] = "false" load_info = dlt.pipeline().run( [1, 2, 3], table_name="data", destination="dummy", dataset_name="data_data" ) diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index 4b46bb7c3e..bf48e347c2 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -733,7 +733,8 @@ def test_pydantic_contract_implementation(contract_setting: str, as_list: bool) from pydantic import BaseModel class Items(BaseModel): - id: int # noqa: A003 + # for variant test below we must allow allow id to be nullable + id: Optional[int] # noqa: A003 name: str def get_items(as_list: bool = False):