Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/1749 abort load package and raise exception on terminal errors in jobs #1781

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _make_database_exception(ex: Exception) -> Exception:
return DatabaseTransientException(ex)
return DatabaseTerminalException(ex)
elif isinstance(ex, databricks_lib.OperationalError):
return DatabaseTerminalException(ex)
return DatabaseTransientException(ex)
elif isinstance(ex, (databricks_lib.ProgrammingError, databricks_lib.IntegrityError)):
return DatabaseTerminalException(ex)
elif isinstance(ex, databricks_lib.DatabaseError):
Expand Down
24 changes: 8 additions & 16 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ def __init__(
buffer_max_items: int = 1000,
retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY,
retry_pipeline_steps: Sequence[TPipelineStep] = ("load",),
fail_task_if_any_job_failed: bool = True,
abort_task_if_any_job_failed: bool = False,
abort_task_if_any_job_failed: bool = True,
wipe_local_data: bool = True,
save_load_info: bool = False,
save_trace_info: bool = False,
Expand All @@ -82,11 +81,7 @@ def __init__(
The `data_folder` is available in certain Airflow deployments. In case of Composer, it is a location on the gcs bucket. `use_data_folder` is disabled and should be
enabled only when needed. The operations on bucket are non-atomic and way slower than on local storage and should be avoided.

`fail_task_if_any_job_failed` will raise an exception if any of the loading jobs failed permanently and thus fail the current Airflow task.
This happens **after all dlt loading jobs executed**. See more here: https://dlthub.com/docs/running-in-production/running#failed-jobs

`abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. This may put your warehouse in
inconsistent state so the option is disabled by default.
`abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline.

The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace

Expand All @@ -99,7 +94,6 @@ def __init__(
buffer_max_items (int, optional): Maximum number of buffered items. Use 0 to keep dlt built-in limit. Defaults to 1000.
retry_policy (_type_, optional): Tenacity retry policy. Defaults to no retry.
retry_pipeline_steps (Sequence[TPipelineStep], optional): Which pipeline steps are eligible for retry. Defaults to ("load", ).
fail_task_if_any_job_failed (bool, optional): Will fail a task if any of the dlt load jobs failed. Defaults to True.
wipe_local_data (bool, optional): Will wipe all the data created by pipeline, also in case of exception. Defaults to False.
save_load_info (bool, optional): Will save extensive load info to the destination. Defaults to False.
save_trace_info (bool, optional): Will save trace info to the destination. Defaults to False.
Expand All @@ -112,7 +106,6 @@ def __init__(
self.buffer_max_items = buffer_max_items
self.retry_policy = retry_policy
self.retry_pipeline_steps = retry_pipeline_steps
self.fail_task_if_any_job_failed = fail_task_if_any_job_failed
self.abort_task_if_any_job_failed = abort_task_if_any_job_failed
self.wipe_local_data = wipe_local_data
self.save_load_info = save_load_info
Expand Down Expand Up @@ -270,10 +263,11 @@ def _run(
dlt.config["data_writer.buffer_max_items"] = self.buffer_max_items
logger.info(f"Set data_writer.buffer_max_items to {self.buffer_max_items}")

# enable abort package if job failed
if self.abort_task_if_any_job_failed:
dlt.config["load.raise_on_failed_jobs"] = True
logger.info("Set load.abort_task_if_any_job_failed to True")
if self.abort_task_if_any_job_failed is not None:
dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed
logger.info(
"Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}"
)

if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR:
task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER)
Expand Down Expand Up @@ -329,9 +323,7 @@ def log_after_attempt(retry_state: RetryCallState) -> None:
table_name="_trace",
loader_file_format=loader_file_format,
)
# raise on failed jobs if requested
if self.fail_task_if_any_job_failed:
load_info.raise_on_failed_jobs()

finally:
# always completely wipe out pipeline folder, in case of success and failure
if self.wipe_local_data:
Expand Down
2 changes: 1 addition & 1 deletion dlt/load/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class LoaderConfiguration(PoolRunnerConfiguration):
parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""Which parallelism strategy to use at load time"""
pool_type: TPoolType = "thread" # mostly i/o (upload) so may be thread pool
raise_on_failed_jobs: bool = False
raise_on_failed_jobs: bool = True
"""when True, raises on terminally failed jobs immediately"""
raise_on_max_retries: int = 5
"""When gt 0 will raise when job reaches raise_on_max_retries"""
Expand Down
5 changes: 1 addition & 4 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
Sequence,
Iterable,
Optional,
Any,
Dict,
Union,
TYPE_CHECKING,
)

from dlt.common.jsonpath import TAnyJsonPath
from dlt.common.exceptions import TerminalException
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.pipeline import pipeline_state as current_pipeline_state, TRefreshMode
from dlt.common.storages.load_package import TLoadPackageDropTablesState
Expand Down Expand Up @@ -139,7 +136,7 @@ def __call__(self) -> None:

self.pipeline.normalize()
try:
self.pipeline.load(raise_on_failed_jobs=True)
self.pipeline.load()
except Exception:
# Clear extracted state on failure so command can run again
self.pipeline.drop_pending_packages()
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def load(
credentials: Any = None,
*,
workers: int = 20,
raise_on_failed_jobs: bool = False,
raise_on_failed_jobs: bool = ConfigValue,
) -> LoadInfo:
"""Loads the packages prepared by `normalize` method into the `dataset_name` at `destination`, optionally using provided `credentials`"""
# set destination and default dataset if provided (this is the reason we have state sync here)
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Note: All paths in this guide are relative to the `dlt` repository directory.
- Update the doc string which will compromise the generated markdown file, check the other examples how it is done
- If your example requires any secrets, add the vars to the example.secrects.toml but do not enter the values.
- Add your example code, make sure you have a `if __name__ = "__main__"` clause in which you run the example script, this will be used for testing
- You should add one or two assertions after running your example and maybe also `load_info.raise_on_failed_jobs()`, this will help greatly with testing
- You should add one or two assertions after running your example

## Testing
- You can test your example simply by running your example script from your example folder. On CI a test will be automatically generated.
Expand All @@ -31,4 +31,4 @@ If you use any secrets for the code snippets, e.g. Zendesk requires credentials.
If your example requires any additional dependency, then you can add it

- To `pyproject.toml` in the `[tool.poetry.group.docs.dependencies]` section.
- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit.
- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit.
3 changes: 0 additions & 3 deletions docs/examples/_template/_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,3 @@
# Extract, normalize, and load the data
load_info = pipeline.run([1, 2, 3], table_name="player")
print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/chess/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,3 @@ def players_games(username: Any) -> Iterator[TDataItems]:
).run(chess(max_players=5, month=9))
# display where the data went
print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
7 changes: 1 addition & 6 deletions docs/examples/chess_production/chess_production.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ def load_data_with_retry(pipeline, data):
load_info = pipeline.run(data)
logger.info(str(load_info))

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
Expand Down Expand Up @@ -169,7 +167,4 @@ def load_data_with_retry(pipeline, data):
)
# get data for a few famous players
data = chess(max_players=MAX_PLAYERS)
load_info = load_data_with_retry(pipeline, data)

# make sure nothing failed
load_info.raise_on_failed_jobs()
load_data_with_retry(pipeline, data)
3 changes: 0 additions & 3 deletions docs/examples/connector_x_arrow/connector_x_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,3 @@ def genome_resource():
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,3 @@ def bigquery_insert(
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))

print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
__source_name__ = "spotify"

import datetime # noqa: I251
import os
from dataclasses import dataclass, fields
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -142,7 +141,6 @@ def lancedb_destination(items: TDataItems, table: TTableSchema) -> None:
)

load_info = pipeline.run(spotify_shows())
load_info.raise_on_failed_jobs()
print(load_info)

row_counts = pipeline.last_trace.last_normalize_info
Expand Down
4 changes: 0 additions & 4 deletions docs/examples/custom_naming/custom_naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
# Extract, normalize, and load the data
load_info = pipeline.run([{"StückId": 1}], table_name="Ausrüstung")
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()
with pipeline.sql_client() as client:
# NOTE: we quote case sensitive identifers
with client.execute_query('SELECT "StückId" FROM "Ausrüstung"') as cur:
Expand All @@ -66,12 +64,10 @@
# duckdb is case insensitive so tables and columns below would clash but sql_ci_no_collision prevents that
data_1 = {"ItemID": 1, "itemid": "collides"}
load_info = pipeline.run([data_1], table_name="BigData")
load_info.raise_on_failed_jobs()

data_2 = {"1Data": 1, "_1data": "collides"}
# use colliding table
load_info = pipeline.run([data_2], table_name="bigdata")
load_info.raise_on_failed_jobs()

with pipeline.sql_client() as client:
from duckdb import DuckDBPyConnection
Expand Down
3 changes: 0 additions & 3 deletions docs/examples/google_sheets/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,3 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
print(row_counts.keys())
assert row_counts["hidden_columns_merged_cells"] == 7
assert row_counts["blank_columns"] == 21

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/incremental_loading/incremental_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,3 @@ def get_pages(
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["ticket_events"] == 17

# make sure nothing failed
load_info.raise_on_failed_jobs()
9 changes: 0 additions & 9 deletions docs/examples/nested_data/nested_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def convert_mongo_objs(value: Any) -> Any:
tables.pop("_dlt_pipeline_state")
assert len(tables) == 7, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
# This allows for dynamic control over the maximum nesting
Expand All @@ -149,9 +146,6 @@ def convert_mongo_objs(value: Any) -> Any:
tables.pop("_dlt_pipeline_state")
assert len(tables) == 1, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
# in 'movies' table should have type complex which means
Expand All @@ -168,6 +162,3 @@ def convert_mongo_objs(value: Any) -> Any:
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 6, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
keywords: [parent child relationship, parent key]
---

This example demonstrates handling data with parent-child relationships using
the `dlt` library. You learn how to integrate specific fields (e.g., primary,
foreign keys) from a parent record into each child record.
This example demonstrates handling data with parent-child relationships using the `dlt` library.
You learn how to integrate specific fields (e.g., primary, foreign keys) from a parent record into each child record.

In this example, we'll explore how to:

Expand Down
3 changes: 0 additions & 3 deletions docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,3 @@ def pdf_to_text(file_item, separate_pages: bool = False):
client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
assert row_counts["table_1"] == 9
assert row_counts["table_2"] == 9

# make sure nothing failed
load_info.raise_on_failed_jobs()

if load_type == "replace":
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
Expand Down
6 changes: 0 additions & 6 deletions docs/examples/qdrant_zendesk/qdrant_zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ def get_pages(

print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()

# getting the authenticated Qdrant client to connect to your Qdrant database
with pipeline.destination_client() as destination_client:
from qdrant_client import QdrantClient
Expand All @@ -194,6 +191,3 @@ def get_pages(
)

assert len(response) <= 3 and len(response) > 0

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/transformers/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,3 @@ def species(pokemon_details):
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def destination_instantiation_snippet() -> None:
# here dependencies dependencies will be imported, secrets pulled and destination accessed
# we pass bucket_url explicitly and expect credentials passed by config provider
load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url))
load_info.raise_on_failed_jobs()
print(load_info)
# @@@DLT_SNIPPET_END late_destination_access


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ async def _run_async():
loop.run_in_executor(executor, _run_pipeline, pipeline_1, async_table),
loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table),
)
# result contains two LoadInfo instances
results[0].raise_on_failed_jobs()
results[1].raise_on_failed_jobs()
# results contains two LoadInfo instances
willi-mueller marked this conversation as resolved.
Show resolved Hide resolved
print("pipeline_1", results[0])
print("pipeline_2", results[1])

# load data
asyncio.run(_run_async())
Expand Down
28 changes: 13 additions & 15 deletions docs/website/docs/running-in-production/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,19 @@ def check(ex: Exception):

### Failed jobs

If any job in the package **fail terminally** it will be moved to `failed_jobs` folder and assigned
such status. By default **no exception is raised** and other jobs will be processed and completed.
You may inspect if the failed jobs are present by checking the load info as follows:
If any job in the package **fails terminally** it will be moved to `failed_jobs` folder and assigned
such status.
By default, **an exceptions is raised** and on the first failed job, the load package will be aborted with `LoadClientJobFailed` (terminal exception).
Such package will be completed but its load id is not added to the `_dlt_loads` table.
All the jobs that were running in parallel are completed before raising. The dlt state, if present, will not be visible to `dlt`.
Here is an example `config.toml` to disable this behavior:

```toml
# I hope you know what you are doing by setting this to false
load.raise_on_failed_jobs=false
```

If you prefer dlt to to not raise a terminal exception on failed jobs then you can manually check for failed jobs and raise an exception by checking the load info as follows:

```py
# returns True if there are failed jobs in any of the load packages
Expand All @@ -270,18 +280,6 @@ print(load_info.has_failed_jobs)
load_info.raise_on_failed_jobs()
```

You may also abort the load package with `LoadClientJobFailed` (terminal exception) on a first
failed job. Such package is will be completed but its load id is not added to the
`_dlt_loads` table. All the jobs that were running in parallel are completed before raising. The dlt
state, if present, will not be visible to `dlt`. Here's example `config.toml` to enable this option:

```toml
# you should really load just one job at a time to get the deterministic behavior
load.workers=1
# I hope you know what you are doing by setting this to true
load.raise_on_failed_jobs=true
```

:::caution
Note that certain write dispositions will irreversibly modify your data
1. `replace` write disposition with the default `truncate-and-insert` [strategy](../general-usage/full-loading.md) will truncate tables before loading.
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/common/test_cli_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_invoke_pipeline(script_runner: ScriptRunner) -> None:
shutil.copytree("tests/cli/cases/deploy_pipeline", TEST_STORAGE_ROOT, dirs_exist_ok=True)

with set_working_dir(TEST_STORAGE_ROOT):
with custom_environ({"COMPETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}):
with custom_environ({"COMPLETED_PROB": "1.0", DLT_DATA_DIR: get_dlt_data_dir()}):
venv = Venv.restore_current()
venv.run_script("dummy_pipeline.py")
# we check output test_pipeline_command else
Expand Down
2 changes: 2 additions & 0 deletions tests/cli/test_pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def test_pipeline_command_failed_jobs(repo_dir: str, project_files: FileStorage)

# now run the pipeline
os.environ["FAIL_PROB"] = "1.0"
# let it fail without an exception
os.environ["RAISE_ON_FAILED_JOBS"] = "false"
venv = Venv.restore_current()
try:
print(venv.run_script("chess_pipeline.py"))
Expand Down
1 change: 1 addition & 0 deletions tests/destinations/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
wipe_pipeline,
duckdb_pipeline_location,
)
from tests.common.configuration.utils import environment
Loading
Loading