Skip to content

Commit

Permalink
fixes some test cases that started to abort
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 10, 2024
1 parent 9f4ff1e commit 74697d8
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def destination_instantiation_snippet() -> None:
# here dependencies dependencies will be imported, secrets pulled and destination accessed
# we pass bucket_url explicitly and expect credentials passed by config provider
load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url))
print(load_info)
# @@@DLT_SNIPPET_END late_destination_access


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ async def _run_async():
loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table),
)
# results contains two LoadInfo instances
print("pipeline_1", results[0])
print("pipeline_2", results[1])

# load data
asyncio.run(_run_async())
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/common/test_cli_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_invoke_pipeline(script_runner: ScriptRunner) -> None:
shutil.copytree("tests/cli/cases/deploy_pipeline", TEST_STORAGE_ROOT, dirs_exist_ok=True)

with set_working_dir(TEST_STORAGE_ROOT):
with custom_environ({"COMPETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}):
with custom_environ({"COMPLETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}):
venv = Venv.restore_current()
venv.run_script("dummy_pipeline.py")
# we check output test_pipeline_command else
Expand Down
2 changes: 2 additions & 0 deletions tests/cli/test_pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def test_pipeline_command_failed_jobs(repo_dir: str, project_files: FileStorage)

# now run the pipeline
os.environ["FAIL_PROB"] = "1.0"
# let it fail without an exception
os.environ["RAISE_ON_FAILED_JOBS"] = "false"
venv = Venv.restore_current()
try:
print(venv.run_script("chess_pipeline.py"))
Expand Down
12 changes: 9 additions & 3 deletions tests/load/bigquery/test_bigquery_streaming_insert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pytest

import dlt
from dlt.common.pipeline import LoadInfo
from dlt.destinations.adapters import bigquery_adapter
from dlt.load.exceptions import LoadClientJobFailed
from dlt.pipeline.exceptions import PipelineStepFailed
from tests.pipeline.utils import assert_load_info


Expand Down Expand Up @@ -40,13 +43,16 @@ def test_resource():
test_resource.apply_hints(additional_table_hints={"x-insert-api": "streaming"})

pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery")
info = pipe.run(test_resource)
with pytest.raises(PipelineStepFailed) as pip_ex:
pipe.run(test_resource)
assert isinstance(pip_ex.value.step_info, LoadInfo)
assert pip_ex.value.step_info.has_failed_jobs
# pick the failed job
failed_job = info.load_packages[0].jobs["failed_jobs"][0]
assert isinstance(pip_ex.value.__cause__, LoadClientJobFailed)
assert (
"""BigQuery streaming insert can only be used with `append`"""
""" write_disposition, while the given resource has `merge`."""
) in failed_job.failed_message
) in pip_ex.value.__cause__.failed_message


def test_bigquery_streaming_nested_data():
Expand Down
8 changes: 7 additions & 1 deletion tests/load/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import pytest
from typing import Iterator

from tests.load.utils import ALL_BUCKETS, DEFAULT_BUCKETS, WITH_GDRIVE_BUCKETS, drop_pipeline
from tests.load.utils import (
ALL_BUCKETS,
DEFAULT_BUCKETS,
WITH_GDRIVE_BUCKETS,
drop_pipeline,
empty_schema,
)
from tests.utils import preserve_environ, patch_home_dir


Expand Down
2 changes: 2 additions & 0 deletions tests/load/pipeline/test_csv_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def test_custom_csv_no_header(
ids=lambda x: x.name,
)
def test_custom_wrong_header(destination_config: DestinationTestConfiguration) -> None:
# do not raise on failed jobs
os.environ["RAISE_ON_FAILED_JOBS"] = "false"
csv_format = CsvFormatConfiguration(delimiter="|", include_header=True)
# apply to collected config
pipeline = destination_config.setup_pipeline("postgres_" + uniq_id(), dev_mode=True)
Expand Down
3 changes: 3 additions & 0 deletions tests/load/pipeline/test_databricks_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
def test_databricks_external_location(destination_config: DestinationTestConfiguration) -> None:
# do not interfere with state
os.environ["RESTORE_FROM_DESTINATION"] = "False"
# let the package complete even with failed jobs
os.environ["RAISE_ON_FAILED_JOBS"] = "false"

dataset_name = "test_databricks_external_location" + uniq_id()

from dlt.destinations import databricks, filesystem
Expand Down
2 changes: 2 additions & 0 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ def extended_rows():

# lets violate unique constraint on postgres, redshift and BQ ignore unique indexes
if destination_config.destination == "postgres":
# let it complete even with PK violation (which is a teminal error)
os.environ["RAISE_ON_FAILED_JOBS"] = "false"
assert p.dataset_name == dataset_name
err_info = p.run(
source(1).with_resources("simple_rows"),
Expand Down
17 changes: 9 additions & 8 deletions tests/load/pipeline/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pytest

import dlt
from dlt.common.destination.exceptions import UnsupportedDataType
from dlt.common.utils import uniq_id
from dlt.pipeline.exceptions import PipelineStepFailed
from tests.load.utils import destinations_configs, DestinationTestConfiguration
from tests.cases import table_update_and_row, assert_all_data_types_row
from tests.pipeline.utils import assert_load_info
Expand Down Expand Up @@ -32,11 +34,10 @@ def my_resource() -> Iterator[Any]:
def my_source() -> Any:
return my_resource

info = pipeline.run(my_source(), **destination_config.run_kwargs)

assert info.has_failed_jobs

assert (
"Redshift cannot load TIME columns from"
in info.load_packages[0].jobs["failed_jobs"][0].failed_message
)
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(my_source(), **destination_config.run_kwargs)
assert isinstance(pip_ex.value.__cause__, UnsupportedDataType)
if destination_config.file_format == "parquet":
assert pip_ex.value.__cause__.data_type == "time"
else:
assert pip_ex.value.__cause__.data_type in ("time", "binary")
4 changes: 2 additions & 2 deletions tests/normalize/test_max_nesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def bot_events():
pipeline_name = f"test_max_table_nesting_{nesting_level}_{expected_num_tables}"
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination=dummy(timeout=0.1),
destination=dummy(timeout=0.1, completed_prob=1),
dev_mode=True,
)

Expand Down Expand Up @@ -168,7 +168,7 @@ def some_data():
pipeline_name = "test_different_table_nesting_levels"
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination=dummy(timeout=0.1),
destination=dummy(timeout=0.1, completed_prob=1),
dev_mode=True,
)

Expand Down
2 changes: 2 additions & 0 deletions tests/pipeline/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@
patch_home_dir,
wipe_pipeline,
duckdb_pipeline_location,
test_storage,
)
from tests.common.configuration.utils import environment, toml_providers
from tests.pipeline.utils import drop_dataset_from_env
51 changes: 23 additions & 28 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
many_delayed,
)

DUMMY_COMPLETE = dummy(completed_prob=1) # factory set up to complete jobs


def test_default_pipeline() -> None:
p = dlt.pipeline()
Expand Down Expand Up @@ -634,10 +636,8 @@ def with_table_hints():


def test_restore_state_on_dummy() -> None:
os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately

pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)
p.config.restore_from_destination = True
info = p.run([1, 2, 3], table_name="dummy_table")
print(info)
Expand All @@ -648,26 +648,24 @@ def test_restore_state_on_dummy() -> None:

# wipe out storage
p._wipe_working_folder()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)
assert p.first_run is True
p.sync_destination()
assert p.first_run is True
assert p.state["_state_version"] == 0


def test_first_run_flag() -> None:
os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately

pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)
assert p.first_run is True
# attach
p = dlt.attach(pipeline_name=pipeline_name)
assert p.first_run is True
p.extract([1, 2, 3], table_name="dummy_table")
assert p.first_run is True
# attach again
p = dlt.attach(pipeline_name=pipeline_name)
p = dlt.attach(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)
assert p.first_run is True
assert len(p.list_extracted_load_packages()) > 0
p.normalize()
Expand All @@ -688,7 +686,7 @@ def test_first_run_flag() -> None:


def test_has_pending_data_flag() -> None:
p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination="dummy")
p = dlt.pipeline(pipeline_name="pipe_" + uniq_id(), destination=DUMMY_COMPLETE)
assert p.has_pending_data is False
p.extract([1, 2, 3], table_name="dummy_table")
assert p.has_pending_data is True
Expand All @@ -701,11 +699,10 @@ def test_has_pending_data_flag() -> None:
def test_sentry_tracing() -> None:
import sentry_sdk

os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately
os.environ["RUNTIME__SENTRY_DSN"] = TEST_SENTRY_DSN

pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

# def inspect_transaction(ctx):
# print(ctx)
Expand Down Expand Up @@ -802,7 +799,7 @@ def data_schema_3():

# new pipeline
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

with pytest.raises(PipelineStepFailed):
p.run([data_schema_1(), data_schema_2(), data_schema_3()], write_disposition="replace")
Expand All @@ -814,14 +811,12 @@ def data_schema_3():
assert len(p._schema_storage.list_schemas()) == 0
assert p.default_schema_name is None

os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately
p.run([data_schema_1(), data_schema_2()], write_disposition="replace")
assert set(p.schema_names) == set(p._schema_storage.list_schemas())


def test_run_with_table_name_exceeding_path_length() -> None:
pipeline_name = "pipe_" + uniq_id()
# os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately
p = dlt.pipeline(pipeline_name=pipeline_name)

# we must fix that
Expand Down Expand Up @@ -880,9 +875,8 @@ def test_load_info_raise_on_failed_jobs() -> None:

def test_run_load_pending() -> None:
# prepare some data and complete load with run
os.environ["COMPLETED_PROB"] = "1.0"
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

def some_data():
yield from [1, 2, 3]
Expand Down Expand Up @@ -911,9 +905,9 @@ def source():


def test_retry_load() -> None:
os.environ["COMPLETED_PROB"] = "1.0"
retry_count = 2

os.environ["COMPLETED_PROB"] = "1.0"
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")

Expand Down Expand Up @@ -999,9 +993,8 @@ def _w_local_state():


def test_changed_write_disposition() -> None:
os.environ["COMPLETED_PROB"] = "1.0"
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

@dlt.resource
def resource_1():
Expand Down Expand Up @@ -1048,9 +1041,8 @@ def _get_shuffled_events(repeat: int = 1):

@pytest.mark.parametrize("github_resource", (github_repo_events_table_meta, github_repo_events))
def test_dispatch_rows_to_tables(github_resource: DltResource):
os.environ["COMPLETED_PROB"] = "1.0"
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

info = p.run(_get_shuffled_events | github_resource)
assert_load_info(info)
Expand Down Expand Up @@ -1096,7 +1088,7 @@ def some_source():
return [static_data(), dynamic_func_data(), dynamic_mark_data(), nested_data()]

source = some_source()
p = dlt.pipeline(pipeline_name=uniq_id(), destination="dummy")
p = dlt.pipeline(pipeline_name=uniq_id(), destination=DUMMY_COMPLETE)
p.run(source)

schema = p.default_schema
Expand Down Expand Up @@ -1378,8 +1370,6 @@ def test_emojis_resource_names() -> None:


def test_apply_hints_infer_hints() -> None:
os.environ["COMPLETED_PROB"] = "1.0"

@dlt.source
def infer():
yield dlt.resource(
Expand All @@ -1391,7 +1381,7 @@ def infer():
new_new_hints = {"not_null": ["timestamp"], "primary_key": ["id"]}
s = infer()
s.schema.merge_hints(new_new_hints) # type: ignore[arg-type]
pipeline = dlt.pipeline(pipeline_name="inf", destination="dummy")
pipeline = dlt.pipeline(pipeline_name="inf", destination=DUMMY_COMPLETE)
pipeline.run(s)
# check schema
table = pipeline.default_schema.get_table("table1")
Expand Down Expand Up @@ -1440,7 +1430,7 @@ def test_invalid_data_edge_cases() -> None:
def my_source():
return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy")
pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE)
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(my_source)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
Expand All @@ -1463,7 +1453,7 @@ def res_return():
def my_source_yield():
yield dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy")
pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE)
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(my_source_yield)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
Expand Down Expand Up @@ -1736,6 +1726,7 @@ def test_pipeline_list_packages() -> None:
assert normalized_package.state == "normalized"
assert len(normalized_package.jobs["new_jobs"]) == len(extracted_package.jobs["new_jobs"])
# load all 3 packages and fail all jobs in them
os.environ["RAISE_ON_FAILED_JOBS"] = "false" # do not raise, complete package till the end
os.environ["FAIL_PROB"] = "1.0"
pipeline.load()
load_ids_l = pipeline.list_completed_load_packages()
Expand All @@ -1748,7 +1739,7 @@ def test_pipeline_list_packages() -> None:


def test_remove_pending_packages() -> None:
pipeline = dlt.pipeline(pipeline_name="emojis", destination="dummy")
pipeline = dlt.pipeline(pipeline_name="emojis", destination=DUMMY_COMPLETE)
pipeline.extract(airtable_emojis())
assert pipeline.has_pending_data
pipeline.drop_pending_packages()
Expand Down Expand Up @@ -2490,11 +2481,15 @@ def test_import_jsonl_file() -> None:


def test_import_file_without_sniff_schema() -> None:
os.environ["RAISE_ON_FAILED_JOBS"] = "false"

pipeline = dlt.pipeline(
pipeline_name="test_jsonl_import",
destination="duckdb",
dev_mode=True,
)

# table will not be found which is terminal exception
import_file = "tests/load/cases/loading/header.jsonl"
info = pipeline.run(
[dlt.mark.with_file_import(import_file, "jsonl", 2)],
Expand Down
Loading

0 comments on commit 74697d8

Please sign in to comment.